116 lines
3.8 KiB
Python
116 lines
3.8 KiB
Python
# Copyright 2021 (c) Anna Schumaker.
|
|
from gi.repository import GObject
|
|
from gi.repository import Gio
|
|
from . import sql
|
|
|
|
class Table:
|
|
def __init__(self, table):
|
|
self.cache = dict()
|
|
self.table = table
|
|
self.do_create()
|
|
|
|
def do_create(self): raise NotImplementedError
|
|
def do_factory(self, row): raise NotImplementedError
|
|
def do_insert(self, *args): raise NotImplementedError
|
|
def do_lookup(self, *args): raise NotImplementedError
|
|
|
|
def do_drop(self):
|
|
sql.execute(f"DROP TABLE {self.table}")
|
|
|
|
def delete(self, obj):
|
|
del self.cache[obj.rowid]
|
|
sql.execute(f"DELETE FROM {self.table} WHERE rowid=?", [ obj.rowid ])
|
|
|
|
def factory(self, row):
|
|
if row:
|
|
return self.cache.setdefault(row[0], self.do_factory(row))
|
|
|
|
def find(self, *args):
|
|
return ret if (ret := self.lookup(*args)) else self.insert(*args)
|
|
|
|
def get(self, rowid):
|
|
if (row := self.cache.get(rowid)):
|
|
return row
|
|
cur = sql.execute(f"SELECT * FROM {self.table} WHERE rowid=?", [ rowid ])
|
|
return self.factory(cur.fetchone())
|
|
|
|
def insert(self, *args, **kwargs):
|
|
return self.get(self.do_insert(*args, **kwargs).lastrowid)
|
|
|
|
def lookup(self, *args):
|
|
return self.factory(self.do_lookup(*args).fetchone())
|
|
|
|
def reset(self):
|
|
self.do_drop()
|
|
self.cache.clear()
|
|
self.do_create()
|
|
|
|
|
|
class Model(GObject.GObject, Gio.ListModel, Table):
|
|
def __init__(self, table, order):
|
|
GObject.GObject.__init__(self)
|
|
self.order = order
|
|
Table.__init__(self, table)
|
|
|
|
def do_get_item_type(self):
|
|
return GObject.TYPE_PYOBJECT
|
|
|
|
def do_get_n_items(self):
|
|
return sql.execute(f"SELECT COUNT(*) FROM {self.table}").fetchone()[0]
|
|
|
|
def do_get_item(self, n):
|
|
cur = sql.execute(f"SELECT * FROM {self.table} ORDER BY {self.order} "
|
|
"LIMIT 1 OFFSET ?", [ n ])
|
|
return self.factory(cur.fetchone())
|
|
|
|
def get_item_index(self, item):
|
|
cur = sql.execute("SELECT * FROM (SELECT rowid,ROW_NUMBER() "
|
|
f"OVER (ORDER BY {self.order}) "
|
|
f"FROM {self.table}) "
|
|
"WHERE rowid=?", [ item.rowid ])
|
|
return cur.fetchone()[1] - 1
|
|
|
|
def delete(self, item):
|
|
pos = self.get_item_index(item)
|
|
super().delete(item)
|
|
self.emit("items-changed", pos, 1, 0)
|
|
|
|
def insert(self, *args):
|
|
row = super().insert(*args)
|
|
pos = self.get_item_index(row)
|
|
self.emit("items-changed", pos, 0, 1)
|
|
return row
|
|
|
|
def reset(self):
|
|
n = self.get_n_items()
|
|
super().reset()
|
|
self.emit("items-changed", 0, n, self.get_n_items())
|
|
|
|
|
|
class Child(Table):
|
|
def __init__(self, table, parent, order):
|
|
Table.__init__(self, table)
|
|
self.parent = parent
|
|
self.order = order
|
|
|
|
def get_n_children(self, parent):
|
|
cur = sql.execute(f"SELECT COUNT(*) FROM {self.table} "
|
|
f"WHERE {self.parent}=?", [ parent.rowid ])
|
|
return cur.fetchone()[0]
|
|
|
|
def get_child(self, parent, n):
|
|
cur = sql.execute(f"SELECT * FROM {self.table} WHERE {self.parent}=? "
|
|
f"ORDER BY {self.order} LIMIT 1 OFFSET ?",
|
|
[ parent.rowid, n ])
|
|
return self.factory(cur.fetchone())
|
|
|
|
def get_child_index(self, parent, child):
|
|
cur = sql.execute(f"SELECT * FROM (SELECT rowid,ROW_NUMBER() "
|
|
f"OVER (ORDER BY {self.order}) "
|
|
f"FROM {self.table} "
|
|
f"WHERE {self.parent}=?)"
|
|
f"WHERE rowid=?", [ parent.rowid, child.rowid ])
|
|
return cur.fetchone()[1] - 1
|
|
|
|
def find(self, *args): raise NotImplementedError
|