db: Create a new Table class
This is a simlified Table that can be inherited from to implement a Table-based Gio.ListModel. This gives us the chance to have some tables that aren't ListModel's, since not everything will need that interface and unnecessary position-finding slows everything down. Implements: Issue #8 (Split up db/objects.py) Implements: Issue #12 (Make database items unique) Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
parent
9cf1df7c33
commit
427b9fb925
|
@ -0,0 +1,46 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import sql
|
||||
|
||||
class Table:
|
||||
def __init__(self, table):
|
||||
self.cache = dict()
|
||||
self.table = table
|
||||
self.do_create()
|
||||
|
||||
def do_create(self): raise NotImplementedError
|
||||
def do_factory(self, row): raise NotImplementedError
|
||||
def do_insert(self, *args): raise NotImplementedError
|
||||
def do_lookup(self, *args): raise NotImplementedError
|
||||
|
||||
def do_drop(self):
|
||||
sql.execute(f"DROP TABLE {self.table}")
|
||||
|
||||
def delete(self, obj):
|
||||
del self.cache[obj.rowid]
|
||||
sql.execute(f"DELETE FROM {self.table} WHERE rowid=?", [ obj.rowid ])
|
||||
|
||||
def factory(self, row):
|
||||
if row:
|
||||
return self.cache.setdefault(row[0], self.do_factory(row))
|
||||
|
||||
def find(self, *args):
|
||||
if (ret := self.lookup(*args)):
|
||||
return ret
|
||||
return self.insert(*args)
|
||||
|
||||
def get(self, rowid):
|
||||
if (row := self.cache.get(rowid)):
|
||||
return row
|
||||
cur = sql.execute(f"SELECT * FROM {self.table} WHERE rowid=?", [ rowid ])
|
||||
return self.factory(cur.fetchone())
|
||||
|
||||
def insert(self, *args, **kwargs):
|
||||
return self.get(self.do_insert(*args, **kwargs).lastrowid)
|
||||
|
||||
def lookup(self, *args):
|
||||
return self.factory(self.do_lookup(*args).fetchone())
|
||||
|
||||
def reset(self):
|
||||
self.do_drop()
|
||||
self.cache.clear()
|
||||
self.do_create()
|
|
@ -0,0 +1,78 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
import unittest
|
||||
from . import sql
|
||||
from . import table
|
||||
|
||||
class FakeRow:
|
||||
def __init__(self, data):
|
||||
self.rowid = data["fakeid"]
|
||||
self.name = data["name"]
|
||||
|
||||
class FakeTable(table.Table):
|
||||
def __init__(self):
|
||||
table.Table.__init__(self, "fake_table")
|
||||
self.reset()
|
||||
|
||||
def do_create(self):
|
||||
sql.execute("CREATE TABLE IF NOT EXISTS fake_table "
|
||||
"(fakeid INTEGER PRIMARY KEY, name TEXT UNIQUE)")
|
||||
|
||||
def do_factory(self, row):
|
||||
return FakeRow(row)
|
||||
|
||||
def do_insert(self, name):
|
||||
return sql.execute("INSERT INTO fake_table (name) VALUES (?)", [ name ])
|
||||
|
||||
def do_lookup(self, name):
|
||||
return sql.execute("SELECT * FROM fake_table WHERE name=?", [ name ])
|
||||
|
||||
class TestTable(unittest.TestCase):
|
||||
def test_init(self):
|
||||
fake = FakeTable()
|
||||
self.assertIsInstance(fake.cache, dict)
|
||||
self.assertEqual(fake.table, "fake_table")
|
||||
|
||||
def test_interface(self):
|
||||
with self.assertRaises(NotImplementedError):
|
||||
table.Table.do_create(None)
|
||||
with self.assertRaises(NotImplementedError):
|
||||
table.Table.do_factory(None, None)
|
||||
with self.assertRaises(NotImplementedError):
|
||||
table.Table.do_insert(None, "Text")
|
||||
with self.assertRaises(NotImplementedError):
|
||||
table.Table.do_lookup(None, "Text")
|
||||
|
||||
def test_insert_delete(self):
|
||||
fake = FakeTable()
|
||||
row = fake.insert("Test Name")
|
||||
self.assertIsInstance(row, FakeRow)
|
||||
self.assertEqual(fake.cache[1], row)
|
||||
self.assertEqual(fake.get(1), row)
|
||||
|
||||
fake.delete(row)
|
||||
self.assertEqual(fake.cache, { })
|
||||
|
||||
def test_find(self):
|
||||
fake = FakeTable()
|
||||
row = fake.find("Test Name")
|
||||
self.assertIsInstance(row, FakeRow)
|
||||
self.assertEqual(fake.cache[1], row)
|
||||
self.assertEqual(fake.find("Test Name"), row)
|
||||
|
||||
def test_lookup(self):
|
||||
fake = FakeTable()
|
||||
row = fake.insert("Test Name")
|
||||
fake.cache.clear()
|
||||
|
||||
row = fake.lookup("Test Name")
|
||||
self.assertEqual(row.name, "Test Name")
|
||||
self.assertEqual(fake.cache, { 1 : row })
|
||||
self.assertEqual(fake.lookup("Test Name"), row)
|
||||
|
||||
def test_reset(self):
|
||||
fake = FakeTable()
|
||||
sql.execute("SELECT fakeid,name FROM fake_table")
|
||||
fake.insert("Test Name")
|
||||
|
||||
fake.reset()
|
||||
self.assertEqual(fake.cache, { })
|
Loading…
Reference in New Issue