db: Convert the UserTable to a playlist.Model

Implements: Issue #11 (Cache database items fields)
Implements: Issue #14 (Convert Tables into Gio.ListModels)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2021-10-10 13:49:19 -04:00
parent 67238ed385
commit 4235e794bd
5 changed files with 111 additions and 114 deletions

View File

@ -26,11 +26,11 @@ def make_fake_track(trackno, length, title, path, lib="/a/b/c", art="Test Artist
length, title, pathlib.Path(path))
def reset():
mods = [ artist, album, disc, genre, decade, year, library, state ]
mods = [ state, user, artist, album, disc, genre, decade, year, library ]
for mod in mods: mod.Table.reset()
mods = [ track, user ]
mods = [ track ]
for mod in mods: mod.Table.drop()
for mod in mods: mod.Table.do_create()
@ -38,8 +38,6 @@ def reset():
genre.Map.reset()
user.Map.reset()
user.TempMap.reset()
user.Table.create_default_playlists()
if lib.version.TESTING: reset()
user.Table.create_default_playlists()

View File

@ -50,7 +50,8 @@ class ParentPlaylist(Playlist):
class Model(table.Model):
def insert(self, *args, **kwargs):
return super().insert(state.Table.insert(), *args)
loop = kwargs.pop("loop", False)
return super().insert(state.Table.insert(loop=loop), *args)
def delete(self, plist):
state.Table.delete(plist.plist_state)

View File

@ -49,8 +49,8 @@ class Table:
class Model(GObject.GObject, Gio.ListModel, Table):
def __init__(self, table, order):
GObject.GObject.__init__(self)
Table.__init__(self, table)
self.order = order
Table.__init__(self, table)
def do_get_item_type(self):
return GObject.TYPE_PYOBJECT

View File

@ -4,73 +4,80 @@ import sqlite3
import unittest
from gi.repository import GObject
expected = [ (1, 1, "Collection", "collection", 0, 1),
(2, 2, "Favorites", "favorites", 0, 0),
(3, 3, "New Tracks", "new tracks", 0, 0),
(4, 4, "Previous", "previous", 0, 0),
(5, 5, "Queued Tracks", "queued tracks", 0, 0) ]
class TestCollection(unittest.TestCase):
def test_init(self):
collection = db.user.Table.find("Collection")
self.assertIsInstance(collection, db.user.UserPlaylist)
class TestFavorites(unittest.TestCase):
def test_init(self):
favorites = db.user.Table.find("Favorites")
self.assertIsInstance(favorites, db.user.UserPlaylist)
class TestNewTracks(unittest.TestCase):
def test_init(self):
new = db.user.Table.find("New Tracks")
self.assertIsInstance(new, db.user.UserPlaylist)
class TestPrevious(unittest.TestCase):
def test_init(self):
previous = db.user.Table.find("Previous")
self.assertIsInstance(previous, db.user.UserPlaylist)
class TestQueuedTracks(unittest.TestCase):
def test_init(self):
queued = db.user.Table.find("Queued Tracks")
self.assertIsInstance(queued, db.user.UserPlaylist)
class TestUserPlaylist(unittest.TestCase):
def test_init(self):
plist = db.user.Table.find("Test Playlist")
self.assertIsInstance(plist, db.playlist.Playlist)
self.assertEqual(plist._name, "Test Playlist")
self.assertEqual(plist.get_property("name"), "Test Playlist")
class TestUserTable(unittest.TestCase):
def setUp(self):
db.reset()
def test_playlist_table_init(self):
self.assertIsInstance(db.user.Table, db.user.PlaylistTable)
self.assertEqual(db.user.Default, [ "Collection", "Favorites", "New Tracks",
"Previous", "Queued Tracks" ])
def test_init(self):
table = db.user.UserTable()
self.assertIsInstance(table, db.playlist.Model)
self.assertEqual(table.table, "playlists")
self.assertEqual(table.order, "sort")
cur = db.sql.execute("SELECT playlistid, "
"playlists.plstateid, "
"name, "
"playlists.sort, "
"random,loop "
"FROM playlists "
"JOIN playlist_states "
"ON playlists.plstateid = playlist_states.plstateid")
rows = cur.fetchall()
self.assertEqual(tuple(rows[0]), expected[0])
self.assertEqual(tuple(rows[1]), expected[1])
self.assertEqual(tuple(rows[2]), expected[2])
self.assertEqual(tuple(rows[3]), expected[3])
self.assertEqual(tuple(rows[4]), expected[4])
self.assertIsInstance(db.user.Table, db.user.UserTable)
db.sql.execute("SELECT playlistid,plstateid,name,sort FROM playlists")
def test_playlist_table_insert(self):
playlist = db.user.Table.insert("Test Playlist")
self.assertIsInstance(playlist, db.user.Playlist)
self.assertIsInstance(playlist, db.objects.Tag)
def test_insert(self):
table = db.user.UserTable()
playlist = table.find("Test Playlist")
self.assertEqual(playlist.name, "Test Playlist")
self.assertEqual(playlist.sort, "test playlist")
self.assertIsInstance(playlist.playlist_state, db.state.PlaylistState)
self.assertIsInstance(playlist, db.user.UserPlaylist)
self.assertEqual(playlist._name, "Test Playlist")
with self.assertRaises(sqlite3.IntegrityError):
db.user.Table.insert("Test Playlist")
def test_playlist_table_delete(self):
playlist = db.user.Table.find("Test Playlist")
track = db.make_fake_track(1, 1.234, "Test Title", "/a/b.cde")
db.user.Map.insert(playlist, track)
db.user.TempMap.insert(playlist, track)
db.user.Table.delete(playlist)
self.assertIsNone(db.user.Table.lookup("Test Playlist"))
self.assertEqual(db.user.Map.lookup_playlists(track), [ ])
self.assertEqual(db.user.TempMap.lookup_playlists(track), [ ])
def test_playlist_table_get(self):
playlist = db.user.Playlist(1)
self.assertEqual(db.user.Table.get(1), playlist)
self.assertIsNone(db.user.Table.get(6))
def test_playlist_table_lookup(self):
def test_lookup(self):
playlist = db.user.Table.insert("Test Playlist")
self.assertEqual(db.user.Table.lookup("Test Playlist"), playlist)
self.assertIsNone(db.user.Table.lookup("none"))
def test_playlist_create_default_playlists(self):
db.user.Table.create_default_playlists()
def test_default_playlists(self):
table = db.user.UserTable()
self.assertEqual(table.get_n_items(), 5)
self.assertEqual(table.get_item(0).name, "Collection")
self.assertEqual(table.get_item(1).name, "Favorites")
self.assertEqual(table.get_item(2).name, "New Tracks")
self.assertEqual(table.get_item(3).name, "Previous")
self.assertEqual(table.get_item(4).name, "Queued Tracks")
class TestPlaylistMap(unittest.TestCase):
@ -81,7 +88,7 @@ class TestPlaylistMap(unittest.TestCase):
self.assertIsInstance(db.user.Map, db.user.PlaylistMap)
self.assertIsInstance(db.user.TempMap, db.user.PlaylistMap)
self.assertEqual(db.user.Map.map_lhs, db.user.Playlist)
self.assertEqual(db.user.Map.map_lhs, db.user.Table.get)
self.assertEqual(db.user.Map.map_rhs, db.track.Track)
self.assertFalse(db.user.Map.temporary)

View File

@ -10,62 +10,53 @@
# | name -> playlistid |
# +--------------------+
from gi.repository import GObject
from .sql import execute
from . import objects
from . import state
from . import playlist
from . import sql
from . import track
Default = [ "Collection", "Favorites", "New Tracks",
"Previous", "Queued Tracks" ]
class Playlist(objects.Tag):
def do_get_column(self, column):
return execute(f"SELECT {column} FROM playlists "
"WHERE playlistid=?", [ self.rowid ])
class UserPlaylist(playlist.Playlist):
def __init__(self, row):
playlist.Playlist.__init__(self, row)
self._name = row["name"]
@GObject.Property
def playlist_state(self):
return state.Table.get(self.get_column("plstateid"))
def name(self): return self._name
class PlaylistTable(objects.Table):
class UserTable(playlist.Model):
def __init__(self):
objects.Table.__init__(self, "playlists", Playlist)
playlist.Model.__init__(self, "playlists", "sort")
def do_create(self):
execute("CREATE TABLE IF NOT EXISTS playlists "
"(playlistid INTEGER PRIMARY KEY, "
" plstateid INTEGER NOT NULL, "
" name TEXT UNIQUE, "
" sort TEXT, "
" FOREIGN KEY (plstateid) REFERENCES playlist_states(plstateid))")
execute("CREATE INDEX IF NOT EXISTS playlist_index "
"ON playlists(name)")
sql.execute("CREATE TABLE IF NOT EXISTS playlists "
"(playlistid INTEGER PRIMARY KEY, "
" plstateid INTEGER NOT NULL, "
" name TEXT UNIQUE, "
" sort TEXT, "
" FOREIGN KEY (plstateid) REFERENCES playlist_states(plstateid))")
sql.execute("CREATE INDEX IF NOT EXISTS playlist_index ON playlists(name)")
def do_insert(self, name, loop=False):
plstate = state.Table.insert(random=False, loop=loop)
return execute("INSERT INTO playlists (plstateid, name, sort) "
"VALUES (?, ?, ?)", (plstate.rowid, name, name.casefold()))
self.find("Collection", loop=True)
self.find("Favorites")
self.find("New Tracks")
self.find("Previous")
self.find("Queued Tracks")
def do_delete(self, playlist):
Map.delete_playlist(playlist)
TempMap.delete_playlist(playlist)
return execute("DELETE FROM playlists WHERE playlistid=?", [ int(playlist) ])
def do_factory(self, row):
return UserPlaylist(row)
def do_get(self, rowid):
return execute("SELECT playlistid FROM playlists "
"WHERE playlistid=?", [ rowid ])
def do_insert(self, plstate, name):
return sql.execute("INSERT INTO playlists (plstateid, name, sort) "
"VALUES (?, ?, ?)", [ plstate.rowid, name, name.casefold() ])
def do_lookup(self, name):
return execute("SELECT playlistid FROM playlists "
"WHERE name=?", [ name ])
return sql.execute("SELECT * FROM playlists WHERE name=?", [ name ])
def create_default_playlists(self):
for name in Default:
if (plist := self.lookup(name)) != None:
continue
self.do_insert(name, name == "Collection")
def find(self, name, loop=False):
if (res := self.lookup(name)) == None:
res = self.insert(name, loop=loop)
return res
class PlaylistMap(objects.Map):
@ -73,35 +64,35 @@ class PlaylistMap(objects.Map):
name = "playlist_map" if temp==False else "temp_playlist_map"
self.temporary = temp
objects.Map.__init__(self, name, Playlist, track.Track)
objects.Map.__init__(self, name, Table.get, track.Track)
self.lookup_tracks = self.lookup_rhs
self.lookup_playlists = self.lookup_lhs
def do_create(self):
temp = "" if self.temporary == False else "TEMPORARY"
execute(f"CREATE {temp} TABLE IF NOT EXISTS {self.map_name} "
"(playlistid INTEGER, "
" trackid INTEGER, "
" FOREIGN KEY(playlistid) REFERENCES playlists(playlistid), "
" FOREIGN KEY(trackid) REFERENCES tracks(trackid), "
" UNIQUE(playlistid, trackid))")
sql.execute(f"CREATE {temp} TABLE IF NOT EXISTS {self.map_name} "
"(playlistid INTEGER, "
" trackid INTEGER, "
" FOREIGN KEY(playlistid) REFERENCES playlists(playlistid), "
" FOREIGN KEY(trackid) REFERENCES tracks(trackid), "
" UNIQUE(playlistid, trackid))")
def do_insert(self, playlist, track):
execute(f"INSERT INTO {self.map_name} (playlistid, trackid) "
"VALUES (?, ?)", [ int(playlist), int(track) ])
sql.execute(f"INSERT INTO {self.map_name} (playlistid, trackid) "
"VALUES (?, ?)", [ playlist.rowid, int(track) ])
def do_delete(self, playlist, track):
return execute(f"DELETE FROM {self.map_name} "
"WHERE playlistid=? AND trackid=?",
[ int(playlist), int(track) ])
return sql.execute(f"DELETE FROM {self.map_name} "
"WHERE playlistid=? AND trackid=?",
[ playlist.rowid, int(track) ])
def do_lookup_rhs(self, playlist):
return execute(f"SELECT trackid FROM {self.map_name} "
"WHERE playlistid=?", [ int(playlist) ])
return sql.execute(f"SELECT trackid FROM {self.map_name} "
"WHERE playlistid=?", [ playlist.rowid ])
def do_lookup_lhs(self, track):
return execute(f"SELECT playlistid FROM {self.map_name} "
"WHERE trackid=?", [ int(track) ])
return sql.execute(f"SELECT playlistid FROM {self.map_name} "
"WHERE trackid=?", [ int(track) ])
def delete_track(self, track):
for playlist in self.lookup_playlists(track):
@ -112,6 +103,6 @@ class PlaylistMap(objects.Map):
self.delete(playlist, track)
Table = PlaylistTable()
Table = UserTable()
Map = PlaylistMap(temp=False)
TempMap = PlaylistMap(temp=True)