# models.py
#
# Copyright 2020 Fabio
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import time
from gi.repository import GObject
from dateutil.tz import UTC
from . import local
from .settings import Settings
from .fetcher import Fetcher
## models
def _build_filter(day=None, room=None, track=None, event=None, **kwargs):
filterdata = []
wherestm = []
def __f(col, value):
if value is None:
wherestm.append("{} is null".format(col))
else:
wherestm.append("{} = ?".format(col))
filterdata.append(value)
if day is not None:
__f("date", day.date)
if room is not None:
__f("room", room.name)
if track is not None:
__f("track", track.name)
if event is not None:
__f("id", event.id)
for k,v in kwargs.items():
__f(k, v)
return (filterdata, wherestm)
class Conference:
"""A conference in menu files (ggmenu.json and usermenu.json)"""
def _init(self, obj):
self.metadata = {}
self.user = False
for k, v in obj.items():
setattr(self, k, v)
self.start = datetime.date(*[int(s) for s in self.start.split("-")])
self.end = datetime.date(*[int(s) for s in self.end.split("-")])
return self
def get_map_links(self):
maps = [ l for l in self.metadata.get('links',[]) if l['title']=="Map" ]
print("Conference maps links:", maps)
return maps
def get_map_files(self, cbk):
r = []
for link in self.get_map_links():
r.append(local.get_image_async(link['url'], cbk))
return r
def get_logo_file(self, cbk):
logourl = self.metadata.get('icon', None)
return local.get_image_async(logourl, cbk)
def __repr__(self):
return f"<{self.__class__.__name__} {self.__dict__}>"
@classmethod
def all(cls):
# from giggity menu
return [ cls()._init(s) for s in local.get_menu() ]
@classmethod
def by_url(cls, url):
ss = [ s for s in local.get_menu() if url in s['url'] ]
if len(ss) == 0:
return None
return cls()._init(ss[0])
def get_meta(self):
"""Return metadata from db. This will close any opened db connection."""
local.close()
local.opendb(self.url, False)
return Meta()
local.close()
def to_json(self):
"""Export conference meta as json for user menu"""
d = self.__dict__.copy()
for k in d:
if isinstance(d[k], (datetime.date, datetime.datetime)):
d[k] = d[k].strftime("%Y-%m-%d")
return d
class Meta:
"""Conference metadata from "meta" table in db"""
def __init__(self):
_c = local.getDb().execute("SELECT key, value FROM meta")
_d = [(row['key'], row['value']) for row in _c]
self._data = dict(_d)
def _fmtd_value(self, k):
v = self._data.get(k, None)
if k == "start" or k == "end":
if v is not None and v != "":
if ":" in v:
v = datetime.datetime.strptime(v, "%Y-%m-%d %H:%M:%S")
else:
v = datetime.datetime.strptime(v, "%Y-%m-%d")
else:
v = None
if k == "last_update":
v = 0 if v is None else float(v)
return v
def __getattr__(self, name):
if name.startswith("_"):
return super().__getattr__(name)
return self._fmtd_value(name)
def __setattr__(self, name, value):
if name.startswith("_"):
return super().__setattr__(name, value)
self._data[name] = value
def __enter__(self):
return self
def __exit__(self, *args):
self.save()
def __repr__(self):
return f"<{self.__class__.__name__} {self.__dict__}>"
def save(self):
db = local.getDb()
if db is None:
local.opendb(self.url, False)
db = local.getDb()
db.executemany("INSERT OR REPLACE INTO meta (key, value) VALUES (?,?)", self._data.items())
db.commit()
def to_json(self):
"""Export conference meta as json for user menu"""
return {
"url": self.url,
"title": self.title if self.title else "",
"start": self.start.strftime("%Y-%m-%d"),
"end": self.end.strftime("%Y-%m-%d"),
"user": True
}
class Day:
def _init(self, data):
self.date = data['date']
return self
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.date)
def __str__(self):
return self.date.strftime("%x")
@classmethod
def all(cls):
for row in local.getDb().execute("SELECT date FROM events GROUP BY date"):
yield cls()._init(row)
@classmethod
def count(cls):
for row in local.getDb().execute("SELECT count(DISTINCT date) FROM events"):
return row[0]
@classmethod
def filter(cls, room=None, track=None, event=None):
filterdata, wherestm = _build_filter(room=room, track=track, event=event)
query = """SELECT date FROM events
WHERE {}
GROUP BY date""".format(" AND ".join(wherestm))
for row in local.getDb().execute(query, filterdata):
yield cls()._init(row)
class Room:
def _init(self, data):
self.name = data['room']
return self
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.name)
def __str__(self):
return self.name or "- no room -"
@classmethod
def all(cls):
for row in local.getDb().execute("SELECT room FROM events GROUP BY room"):
yield cls()._init(row)
@classmethod
def count(cls):
for row in local.getDb().execute("SELECT count(DISTINCT room) FROM events"):
return row[0]
@classmethod
def filter(cls, day=None, track=None, event=None):
filterdata, wherestm = _build_filter(day=day, track=track, event=event)
query = """SELECT room FROM events
WHERE {}
GROUP BY room""".format(" AND ".join(wherestm))
for row in local.getDb().execute(query, filterdata):
yield cls()._init(row)
class Track:
def _init(self, data):
self.name = data['track']
self.room = list(Room.filter(track=self))
self.date = list(Day.filter(track=self))
return self
def __repr__(self):
return "<{} {!r} in {!r} @ {}>".format(
self.__class__.__name__, self.name, self.room, self.date)
def __str__(self):
return self.name or "- no track -"
@classmethod
def all(cls):
for row in local.getDb().execute(
"SELECT track, room, date FROM events WHERE track != '' GROUP BY track ORDER BY date, track"):
yield cls()._init(row)
@classmethod
def count(cls):
for row in local.getDb().execute("SELECT count(DISTINCT track) FROM events"):
return row[0]
class Person:
def _init(self, data):
self.id = data['id']
self.name = data['name']
return self
def events(self):
for row in local.getDb().execute(
"""SELECT e.* FROM events as e
LEFT JOIN event_person AS ep ON e.id = ep.event_id
WHERE ep.person_id = ?""", (self.id,)):
yield Event()._init(row)
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.name)
def __str__(self):
return self.name
@classmethod
def all(cls):
for row in local.getDb().execute(
"SELECT id, name FROM persons"):
yield cls()._init(row)
class Link:
def _init(self, event, data):
self.event = event
self.href = data['href']
self.name = data['name']
return self
def __repr__(self):
return "<{} for {!r} {!r} {!r}>".format(
self.__class__.__name__, self.event, self.name, self.href)
def __str__(self):
return '<a href="{}">{}</a>'.format(self.href, self.name)
class Event(GObject.GObject):
__gsignals__ = {
'update': (GObject.SIGNAL_RUN_LAST, GObject.TYPE_NONE, ())
}
def _init(self, data):
self.id = data['id']
self.date = data['date']
self.start = data['start']
self.end = data['end']
self.room = data['room']
self.slug = data['slug']
self.title = data['title']
self.subtitle = data['subtitle'] or ""
self.track = data['track']
self.evtype = data['type']
self.abstract = data['abstract'] or ""
self.description = data['description'] or ""
self.starred = data['starred'] == 1
self.notified = data['notified'] == 1
return self
def set_star(self, starred):
local.getDb().execute("UPDATE events SET starred=? WHERE id=?",(starred,self.id,))
local.getDb().commit()
self.starred = starred
self.emit('update')
def set_notified(self, notified):
local.getDb().execute("UPDATE events SET notified=? WHERE id=?",(notified,self.id,))
local.getDb().commit()
self.notified = notified
self.emit('update')
def start_in_tz(self):
return self._dt_in_tz(self.start)
def end_in_tz(self):
return self._dt_in_tz(self.end)
def _dt_in_tz(self, dt):
dt = dt.replace(tzinfo=UTC)
dt = dt.astimezone(Settings.instance().get_timezone())
return dt
def persons(self):
#print("event", self.id, "persons()")
for row in local.getDb().execute(
"""SELECT p.id, p.name FROM persons as p
LEFT JOIN event_person AS ep ON p.id = ep.person_id
WHERE ep.event_id = ?""", (self.id,)):
yield Person()._init(row)
def links(self):
for row in local.getDb().execute(
"""SELECT href, name FROM links
WHERE event_id=?""", (self.id,)):
yield Link()._init(self, row)
def __repr__(self):
return "<{} {!r} in {!r} @ {!r}>".format(self.__class__.__name__, self.title, self.room, self.start)
def __str__(self):
return self.title
def get_conflicts(self):
query = """SELECT * FROM events
WHERE starred = 1 AND id != ? AND date = ?
AND end > ? AND start < ?
"""
for row in local.getDb().execute(query, (self.id, self.date, self.start, self.end)):
yield Event()._init(row)
@classmethod
def all(cls):
for row in local.getDb().execute("SELECT * FROM events"):
yield cls()._init(row)
@classmethod
def filter(cls, day:Day = None, room:Room = None, track:Track = None, **kwargs):
filterdata, wherestm = _build_filter(day=day, room=room, track=track, **kwargs)
query = """SELECT * FROM events
WHERE {}
ORDER BY start, room""".format(" AND ".join(wherestm))
for row in local.getDb().execute(query, filterdata):
yield cls()._init(row)
@classmethod
def nextup(cls, minutes):
"""get events starting in next `minutes`"""
minutes = min(int(minutes), 30) # some sanity check
now = datetime.datetime.now(UTC).replace(tzinfo=None).strftime("%Y-%m-%dT%H:%M:%S.000")
# now = "now" # is "now" somehow broken?
query = """SELECT * FROM events
WHERE starred=1 AND notified=0
AND datetime(start) >= datetime("{1}")
AND datetime(start) <= datetime("{1}", "+{0} minutes");"""
for row in local.getDb().execute(query.format(minutes, now)):
yield cls()._init(row)
@classmethod
def search(cls, term, starred=False):
"""Search events for 'term'"""
starfilter = ""
if starred:
starfilter = "events.starred = 1 AND"
query = """SELECT events.* FROM fts_event
JOIN events ON events.id = fts_event.event_id
WHERE {} fts_event = ?
GROUP BY events.id
ORDER BY rank""".format(starfilter)
for row in local.getDb().execute(query, (term,)):
yield cls()._init(row)