emmental/db/table.py

116 lines
3.8 KiB
Python

# Copyright 2021 (c) Anna Schumaker.
from gi.repository import GObject
from gi.repository import Gio
from . import sql
class Table:
def __init__(self, table):
self.cache = dict()
self.table = table
self.do_create()
def do_create(self): raise NotImplementedError
def do_factory(self, row): raise NotImplementedError
def do_insert(self, *args): raise NotImplementedError
def do_lookup(self, *args): raise NotImplementedError
def do_drop(self):
sql.execute(f"DROP TABLE {self.table}")
def delete(self, obj):
del self.cache[obj.rowid]
sql.execute(f"DELETE FROM {self.table} WHERE rowid=?", [ obj.rowid ])
def factory(self, row):
if row:
return self.cache.setdefault(row[0], self.do_factory(row))
def find(self, *args):
return ret if (ret := self.lookup(*args)) else self.insert(*args)
def get(self, rowid):
if (row := self.cache.get(rowid)):
return row
cur = sql.execute(f"SELECT * FROM {self.table} WHERE rowid=?", [ rowid ])
return self.factory(cur.fetchone())
def insert(self, *args, **kwargs):
return self.get(self.do_insert(*args, **kwargs).lastrowid)
def lookup(self, *args):
return self.factory(self.do_lookup(*args).fetchone())
def reset(self):
self.do_drop()
self.cache.clear()
self.do_create()
class Model(GObject.GObject, Gio.ListModel, Table):
def __init__(self, table, order):
GObject.GObject.__init__(self)
self.order = order
Table.__init__(self, table)
def do_get_item_type(self):
return GObject.TYPE_PYOBJECT
def do_get_n_items(self):
return sql.execute(f"SELECT COUNT(*) FROM {self.table}").fetchone()[0]
def do_get_item(self, n):
cur = sql.execute(f"SELECT * FROM {self.table} ORDER BY {self.order} "
"LIMIT 1 OFFSET ?", [ n ])
return self.factory(cur.fetchone())
def get_item_index(self, item):
cur = sql.execute("SELECT * FROM (SELECT rowid,ROW_NUMBER() "
f"OVER (ORDER BY {self.order}) "
f"FROM {self.table}) "
"WHERE rowid=?", [ item.rowid ])
return cur.fetchone()[1] - 1
def delete(self, item):
pos = self.get_item_index(item)
super().delete(item)
self.emit("items-changed", pos, 1, 0)
def insert(self, *args):
row = super().insert(*args)
pos = self.get_item_index(row)
self.emit("items-changed", pos, 0, 1)
return row
def reset(self):
n = self.get_n_items()
super().reset()
self.emit("items-changed", 0, n, self.get_n_items())
class Child(Table):
def __init__(self, table, parent, order):
Table.__init__(self, table)
self.parent = parent
self.order = order
def get_n_children(self, parent):
cur = sql.execute(f"SELECT COUNT(*) FROM {self.table} "
f"WHERE {self.parent}=?", [ parent.rowid ])
return cur.fetchone()[0]
def get_child(self, parent, n):
cur = sql.execute(f"SELECT * FROM {self.table} WHERE {self.parent}=? "
f"ORDER BY {self.order} LIMIT 1 OFFSET ?",
[ parent.rowid, n ])
return self.factory(cur.fetchone())
def get_child_index(self, parent, child):
cur = sql.execute(f"SELECT * FROM (SELECT rowid,ROW_NUMBER() "
f"OVER (ORDER BY {self.order}) "
f"FROM {self.table} "
f"WHERE {self.parent}=?)"
f"WHERE rowid=?", [ parent.rowid, child.rowid ])
return cur.fetchone()[1] - 1
def find(self, *args): raise NotImplementedError