diff --git a/emmental/db/emmental.sql b/emmental/db/emmental.sql index 2dc15d7..58798e4 100644 --- a/emmental/db/emmental.sql +++ b/emmental/db/emmental.sql @@ -395,6 +395,30 @@ CREATE TRIGGER tracks_active_trigger END; +/********************************************* + * * + * Track <--> Playlist Linking * + * * + *********************************************/ + +CREATE TABLE system_tracks ( + trackid INTEGER REFERENCES tracks (trackid) + ON DELETE CASCADE + ON UPDATE CASCADE, + propertyid INTEGER REFERENCES playlist_properties (propertyid) + ON DELETE CASCADE + ON UPDATE CASCADE, + UNIQUE(trackid, propertyid) +); + +CREATE VIEW system_tracks_view AS + SELECT trackid, system_tracks.propertyid + FROM system_tracks + JOIN tracks USING (trackid) + JOIN libraries USING (libraryid) + WHERE libraries.deleting = FALSE; + + /**************************************************** * * * Data saved when Tracks are deleted * diff --git a/emmental/db/playlist.py b/emmental/db/playlist.py index bc60842..c01dd3b 100644 --- a/emmental/db/playlist.py +++ b/emmental/db/playlist.py @@ -1,5 +1,6 @@ # Copyright 2022 (c) Anna Schumaker """A customized Gio.ListStore for tracking Playlist GObjects.""" +import sqlite3 from gi.repository import GObject from gi.repository import Gio from gi.repository import Gtk @@ -102,6 +103,9 @@ class Table(table.Table): active_playlist = GObject.Property(type=Playlist) treemodel = GObject.Property(type=Gtk.TreeListModel) + autodelete = GObject.Property(type=bool, default=False) + system_tracks = GObject.Property(type=bool, default=True) + def __init__(self, sql: GObject.TYPE_PYOBJECT, **kwargs): """Initialize a Playlist Table.""" super().__init__(sql=sql, **kwargs) @@ -110,13 +114,56 @@ class Table(table.Table): autoexpand=False, create_func=self.__create_tree) + def __do_autodelete(self, plist: Playlist) -> bool: + if plist.n_tracks == 0: + self.delete(plist) + return True + + def __autodelete(self, plist: Playlist): + if self.autodelete: + self.queue.push(self.__do_autodelete, plist) + def __create_tree(self, plist: Playlist) -> Gtk.FilterListModel | None: return plist.children + def do_add_track(self, playlist: Playlist, track: Track) -> bool: + """Add a Track to the Playlist.""" + raise NotImplementedError + def do_get_sort_key(self, playlist: Playlist) -> tuple[str]: """Get a sort key for the requested Playlist.""" return format.sort_key(playlist.name) + def do_move_track_down(self, playlist: Playlist, track: Track) -> bool: + """Move a track down in the sort order.""" + raise NotImplementedError + + def do_move_track_up(self, playlist: Playlist, track: Track) -> bool: + """Move a track up in the sort order.""" + raise NotImplementedError + + def do_remove_track(self, playlist: Playlist, track: Track) -> bool: + """Remove a Track from the Playlist.""" + raise NotImplementedError + + def do_sql_select_trackids(self, playlist: Playlist) -> sqlite3.Cursor: + """Select the trackids that are in this Playlist.""" + raise NotImplementedError + + def add_system_track(self, playlist: Playlist, track: Track) -> bool: + """Add a Track to a system Playlist.""" + cur = self.sql("""INSERT INTO system_tracks (propertyid, trackid) + VALUES (?, ?)""", playlist.propertyid, track.trackid) + return cur and cur.rowcount == 1 + + def add_track(self, playlist: Playlist, track: Track) -> bool: + """Add a Track to a Playlist.""" + if track is None or track.get_library().deleting: + return False + if self.system_tracks: + return self.add_system_track(playlist, track) + return self.do_add_track(playlist, track) + def clear(self) -> None: """Clear the Table.""" self.active_playlist = None @@ -127,6 +174,7 @@ class Table(table.Table): res = super().construct(propertyid=propertyid, name=name, **kwargs) if res.active: self.sql.set_active_playlist(res) + res.reload_tracks(idle=True) return res def delete(self, playlist: Playlist) -> bool: @@ -135,6 +183,50 @@ class Table(table.Table): self.sql.set_active_playlist(None) return super().delete(playlist) + def get_sql_system_trackids(self, playlist: Playlist) -> sqlite3.Cursor: + """Load a System Playlist's Tracks from the database.""" + return self.sql("""SELECT trackid FROM system_tracks_view + WHERE propertyid=?""", playlist.propertyid) + + def get_trackids(self, playlist: Playlist) -> set[int]: + """Load a Playlist's Tracks from the database.""" + if self.system_tracks: + cur = self.get_sql_system_trackids(playlist) + else: + cur = self.do_sql_select_trackids(playlist) + + res = {row["trackid"] for row in cur.fetchall()} + self.__autodelete(playlist) + return res + + def move_track_down(self, playlist: Playlist, track: Track) -> bool: + """Move a track down in the playlist.""" + if not playlist.tracks_movable: + return False + return self.do_move_track_down(playlist, track) + + def move_track_up(self, playlist: Playlist, track: Track) -> bool: + """Move a track up in the playlist.""" + if not playlist.tracks_movable: + return False + return self.do_move_track_up(playlist, track) + + def remove_system_track(self, playlist: Playlist, track: Track) -> bool: + """Remove a Track from a system Playlist.""" + return self.sql("""DELETE FROM system_tracks + WHERE propertyid=? AND trackid=?""", + playlist.propertyid, track.trackid).rowcount == 1 + + def remove_track(self, playlist: Playlist, track: Track) -> bool: + """Remove a Track from a Playlist.""" + if self.system_tracks: + res = self.remove_system_track(playlist, track) + else: + res = self.do_remove_track(playlist, track) + + self.__autodelete(playlist) + return res + def update(self, playlist: Playlist, column: str, newval) -> bool: """Update a Playlist in the Database.""" match column: diff --git a/tests/db/test_playlist.py b/tests/db/test_playlist.py index c279381..fe772c5 100644 --- a/tests/db/test_playlist.py +++ b/tests/db/test_playlist.py @@ -1,5 +1,6 @@ # Copyright 2022 (c) Anna Schumaker """Tests our ListStore and Playlist objects.""" +import pathlib import unittest import unittest.mock import emmental.db.playlist @@ -169,6 +170,14 @@ class TestPlaylistTable(tests.util.TestCase): self.table = tests.util.playlist.MockTable(self.sql) self.sql("DELETE FROM playlist_properties") + self.library = self.sql.libraries.create(pathlib.Path("/a/b")) + self.album = self.sql.albums.create("Test Album", "Artist", "2023-03") + self.medium = self.sql.media.create(self.album, "", number=1) + self.year = self.sql.years.create(2023) + self.track = self.sql.tracks.create(self.library, + pathlib.Path("/a/b/c.ogg"), + self.medium, self.year) + def test_treemodel(self): """Check that the table's treemodel was set up properly.""" self.assertIsInstance(self.table.treemodel, Gtk.TreeListModel) @@ -181,6 +190,25 @@ class TestPlaylistTable(tests.util.TestCase): root.children = Gtk.FilterListModel() self.assertEqual(self.table._Table__create_tree(root), root.children) + def test_add_track(self): + """Test adding a track to a playlist.""" + self.assertTrue(self.table.system_tracks) + + plist = self.table.create("Test Playlist") + self.library.deleting = True + self.assertFalse(self.table.add_track(plist, None)) + self.assertFalse(self.table.add_track(plist, self.track)) + + self.library.deleting = False + self.assertTrue(self.table.add_track(plist, self.track)) + cur = self.sql("SELECT COUNT(*) FROM system_tracks") + self.assertEqual(cur.fetchone()["COUNT(*)"], 1) + self.assertFalse(self.table.add_track(plist, self.track)) + + self.table.system_tracks = False + with self.assertRaises(NotImplementedError): + self.table.add_track(plist, self.track) + def test_construct(self): """Test constructing a new playlist.""" self.assertIsNone(self.table.active_playlist) @@ -197,6 +225,9 @@ class TestPlaylistTable(tests.util.TestCase): self.assertEqual(self.sql.active_playlist, plist2) self.assertTrue(plist2.active) + self.assertEqual(self.table.queue[0], (plist1.load_tracks,)) + self.assertEqual(self.table.queue[1], (plist2.load_tracks,)) + def test_get_sort_key(self): """Test getting a sort key for a playlist.""" plist = self.table.create("Playlist 1") @@ -212,8 +243,10 @@ class TestPlaylistTable(tests.util.TestCase): self.assertEqual(len(self.table), 0) def test_delete(self): - """Test deleting the active playlist.""" + """Test deleting the active playlist with system tracks.""" + self.table.system_tracks = True plist = self.table.create("Test Playlist") + plist.add_track(self.track) self.sql.set_active_playlist(plist) self.assertTrue(self.table.delete(plist)) @@ -221,6 +254,64 @@ class TestPlaylistTable(tests.util.TestCase): self.assertIsNone(self.sql.active_playlist) self.assertNotIn(plist, self.table) + cur = self.sql("SELECT COUNT(*) FROM system_tracks") + self.assertEqual(cur.fetchone()["COUNT(*)"], 0) + + def test_get_trackids(self): + """Test getting the set of trackids for this playlist.""" + self.assertTrue(self.table.system_tracks) + self.table._Table__autodelete = unittest.mock.Mock() + + plist = self.table.create("Test Playlist") + self.assertSetEqual(self.table.get_trackids(plist), set()) + self.table._Table__autodelete.assert_called_with(plist) + + plist.add_track(self.track) + self.assertSetEqual(self.table.get_trackids(plist), + {self.track.trackid}) + + self.library.deleting = True + self.assertSetEqual(self.table.get_trackids(plist), set()) + + self.table.system_tracks = False + with self.assertRaises(NotImplementedError): + self.table.get_trackids(plist) + + def test_move_track_down(self): + """Test moving tracks down in the sort order.""" + plist = self.table.create("Test Playlist") + self.assertFalse(self.table.move_track_down(plist, self.track)) + + plist.tracks_movable = True + with self.assertRaises(NotImplementedError): + self.table.move_track_down(plist, self.track) + + def test_move_track_up(self): + """Test moving tracks up in the sort order.""" + plist = self.table.create("Test Playlist") + self.assertFalse(self.table.move_track_up(plist, self.track)) + + plist.tracks_movable = True + with self.assertRaises(NotImplementedError): + self.table.move_track_up(plist, self.track) + + def test_remove_track(self): + """Test adding a track to a playlist.""" + self.assertTrue(self.table.system_tracks) + self.table._Table__autodelete = unittest.mock.Mock() + + plist = self.table.create("Test Playlist") + plist.add_track(self.track) + self.assertTrue(self.table.remove_track(plist, self.track)) + cur = self.sql("SELECT COUNT(*) FROM system_tracks") + self.assertEqual(cur.fetchone()["COUNT(*)"], 0) + self.assertFalse(self.table.remove_track(plist, self.track)) + self.table._Table__autodelete.assert_called_with(plist) + + self.table.system_tracks = False + with self.assertRaises(NotImplementedError): + self.table.remove_track(plist, self.track) + def test_update(self): """Test updating playlist properties.""" plist1 = self.table.create("Test Playlist 1") @@ -237,3 +328,26 @@ class TestPlaylistTable(tests.util.TestCase): row = self.sql("SELECT active FROM playlist_properties WHERE rowid=?", plist1.propertyid).fetchone() self.assertEqual(row["active"], False) + + def test_autodelete(self): + """Test automatically deleting playlists.""" + plist = self.table.create("Test Playlist") + self.table.queue.cancel() + + self.assertFalse(self.table.autodelete) + self.table._Table__autodelete(plist) + self.assertFalse(self.table.queue.running) + self.assertIn(plist, self.table) + + self.table.autodelete = True + self.table._Table__autodelete(plist) + self.assertTupleEqual(self.table.queue[0], + (self.table._Table__do_autodelete, plist)) + + plist.n_tracks = 1 + self.assertTrue(self.table._Table__do_autodelete(plist)) + self.assertIn(plist, self.table) + + plist.n_tracks = 0 + self.assertTrue(self.table._Table__do_autodelete(plist)) + self.assertNotIn(plist, self.table)