diff --git a/emmental/db/emmental.sql b/emmental/db/emmental.sql index cda7f01..66e8e70 100644 --- a/emmental/db/emmental.sql +++ b/emmental/db/emmental.sql @@ -463,6 +463,17 @@ CREATE TABLE system_tracks ( UNIQUE(trackid, propertyid) ); +CREATE TABLE user_tracks ( + trackid INTEGER REFERENCES tracks (trackid) + ON DELETE CASCADE + ON DELETE CASCADE, + propertyid INTEGER REFERENCES playlist_properties (propertyid) + ON DELETE CASCADE + ON UPDATE CASCADE, + position INTEGER, + UNIQUE(trackid, propertyid) +); + CREATE VIEW system_tracks_view AS SELECT trackid, system_tracks.propertyid FROM system_tracks @@ -470,6 +481,41 @@ CREATE VIEW system_tracks_view AS JOIN libraries USING (libraryid) WHERE libraries.deleting = FALSE; +CREATE VIEW user_tracks_view AS + SELECT trackid, user_tracks.propertyid, user_tracks.position + FROM user_tracks + JOIN tracks USING (trackid) + JOIN libraries USING (libraryid) + WHERE libraries.deleting = FALSE; + +CREATE VIEW collection_view AS + SELECT tracks.trackid FROM tracks + JOIN libraries USING (libraryid) + WHERE libraries.enabled = TRUE AND libraries.deleting = FALSE; + +CREATE VIEW favorite_view AS + SELECT tracks.trackid FROM tracks + JOIN libraries USING (libraryid) + WHERE tracks.favorite = TRUE AND libraries.deleting = FALSE; + +CREATE VIEW most_played_view AS + SELECT tracks.trackid FROM tracks + JOIN libraries USING (libraryid) + WHERE tracks.playcount > (SELECT CEIL(AVG(playcount)) + FROM tracks WHERE playcount>0) + AND libraries.deleting = FALSE; + +CREATE VIEW new_tracks_view AS + SELECT tracks.trackid FROM tracks + JOIN libraries USING (libraryid) + WHERE tracks.added > DATE('now', 'localtime', '-7 days') + AND libraries.deleting = FALSE; + +CREATE VIEW unplayed_tracks_view AS + SELECT tracks.trackid FROM tracks + JOIN libraries USING (libraryid) + WHERE tracks.playcount == 0 AND libraries.deleting = FALSE; + /**************************************************** * * diff --git a/emmental/db/libraries.py b/emmental/db/libraries.py index e9470dd..2d324cf 100644 --- a/emmental/db/libraries.py +++ b/emmental/db/libraries.py @@ -130,6 +130,8 @@ class Table(playlist.Table): def do_sql_update(self, library: Library, column: str, newval) -> bool: """Update a Library playlist.""" + if column == "enabled" and self.sql.playlists.collection: + self.sql.playlists.collection.reload_tracks(idle=True) return self.sql(f"UPDATE libraries SET {column}=? WHERE rowid=?", newval, library.libraryid) diff --git a/emmental/db/playlists.py b/emmental/db/playlists.py index e84a5a3..5139c0e 100644 --- a/emmental/db/playlists.py +++ b/emmental/db/playlists.py @@ -3,6 +3,7 @@ import sqlite3 from gi.repository import GObject from . import playlist +from . import tracks class Playlist(playlist.Playlist): @@ -16,6 +17,10 @@ class Playlist(playlist.Playlist): match (self.name, column, self.get_property(column)): case ("Collection", "loop", "None"): self.loop = "Playlist" + case ("Collection", "n-tracks", 0): + self.table.have_collection_tracks = False + case ("Collection", "n-tracks", _): + self.table.have_collection_tracks = True case ("Previous Tracks", "loop", "Playlist") | \ ("Previous Tracks", "loop", "Track"): self.loop = "None" @@ -47,9 +52,31 @@ class Table(playlist.Table): queued = GObject.Property(type=Playlist) unplayed = GObject.Property(type=Playlist) + have_collection_tracks = GObject.Property(type=bool, default=False) + def __init__(self, sql: GObject.TYPE_PYOBJECT, **kwargs): """Initialize the Playlists Table.""" - super().__init__(sql=sql, **kwargs) + super().__init__(sql=sql, system_tracks=False, **kwargs) + + def __move_user_trackid(self, playlist: Playlist, trackid: int, + *, offset: int) -> bool: + order = self.get_track_order(playlist) + tracks = sorted(playlist.tracks.trackids, key=order.get) + start = tracks.index(trackid) + + new = start + offset + if not (0 <= new < len(tracks)): + return False + + tracks[start] = tracks[new] + tracks[new] = trackid + + # Note: We write out all trackids so we don't have to update during + # do_add_track() and do_remove_track() + args = [(i, playlist.propertyid, t) for (i, t) in enumerate(tracks)] + self.sql.executemany("""UPDATE user_tracks SET position=? + WHERE propertyid=? AND trackid=?""", *args) + return True def do_construct(self, **kwargs) -> Playlist: """Construct a new playlist.""" @@ -60,7 +87,10 @@ class Table(playlist.Table): self.favorites.user_tracks = True case "Most Played Tracks": self.most_played = plist case "New Tracks": self.new_tracks = plist - case "Previous Tracks": self.previous = plist + case "Previous Tracks": + self.previous = plist + self.sql("DELETE FROM system_tracks WHERE propertyid=?", + self.previous.propertyid) case "Queued Tracks": self.queued = plist self.queued.user_tracks = True @@ -71,6 +101,54 @@ class Table(playlist.Table): plist.tracks_movable = True return plist + def do_add_track(self, playlist: Playlist, track: tracks.Track) -> bool: + """Add a Track to the requested Playlist.""" + match playlist: + case self.collection: return track.get_library().enabled + case self.most_played: view = "most_played_view" + case self.new_tracks: view = "new_tracks_view" + case self.favorites: + track.update_properties(favorite=True) + return True + case self.previous: + self.add_system_track(playlist, track) + return True + case self.unplayed: return track.playcount == 0 + case _: return self.add_user_track(playlist, track) + + return self.sql(f"SELECT ? IN {view}", track.trackid).fetchone()[0] + + def do_get_user_track_order(self, playlist: Playlist) -> dict[int, int]: + """Get the user-configured sort order for a playlist.""" + cur = self.sql("""SELECT trackid FROM user_tracks WHERE propertyid=? + ORDER BY position NULLS LAST, rowid""", + playlist.propertyid) + return {row["trackid"]: i for (i, row) in enumerate(cur.fetchall())} + + def do_move_track_down(self, playlist: Playlist, + track: tracks.Track) -> bool: + """Move a track down in the user sort order.""" + return self.__move_user_trackid(playlist, track.trackid, offset=1) + + def do_move_track_up(self, playlist: Playlist, + track: tracks.Track) -> bool: + """Move a track up in the user sort order.""" + return self.__move_user_trackid(playlist, track.trackid, offset=-1) + + def do_remove_track(self, playlist: Playlist, track: tracks.Track) -> bool: + """Remove a Track from the requested Playlist.""" + match playlist: + case self.collection: return True + case self.most_played: return True + case self.new_tracks: return True + case self.unplayed: return True + case self.favorites: + track.update_properties(favorite=False) + return True + case self.previous: + return self.remove_system_track(playlist, track) + case _: return self.remove_user_track(playlist, track) + def do_sql_delete(self, playlist: Playlist) -> sqlite3.Cursor: """Delete a playlist.""" return self.sql("DELETE FROM playlists WHERE playlistid=?", @@ -91,6 +169,19 @@ class Table(playlist.Table): """Load playlists from the database.""" return self.sql("SELECT * FROM playlists_view") + def do_sql_select_trackids(self, playlist: Playlist) -> sqlite3.Cursor: + """Load Tracks from the database.""" + match playlist: + case self.collection: view = "collection_view" + case self.favorites: view = "favorite_view" + case self.most_played: view = "most_played_view" + case self.new_tracks: view = "new_tracks_view" + case self.unplayed: view = "unplayed_tracks_view" + case self.previous: return self.get_sql_system_trackids(playlist) + case _: return self.get_sql_user_trackids(playlist) + + return self.sql(f"SELECT trackid FROM {view}") + def do_sql_select_one(self, name: str) -> sqlite3.Cursor: """Look up a playlist by name.""" return self.sql("SELECT playlistid FROM playlists WHERE name=?", name) @@ -101,11 +192,29 @@ class Table(playlist.Table): return self.sql(f"UPDATE playlists SET {column}=? WHERE playlistid=?", newval, playlist.playlistid) + def add_user_track(self, playlist: Playlist, track: tracks.Track) -> bool: + """Add a Track to the User Tracks table.""" + cur = self.sql("""INSERT INTO user_tracks (propertyid, trackid) + VALUES (?, ?)""", playlist.propertyid, track.trackid) + return cur and cur.rowcount == 1 + + def get_sql_user_trackids(self, playlist: Playlist) -> sqlite3.Cursor: + """Load user Tracks from the database.""" + return self.sql("""SELECT trackid FROM user_tracks_view + WHERE propertyid=?""", playlist.propertyid) + def create(self, name: str) -> Playlist: """Create a new Playlist.""" if len(name := name.strip()) > 0: return super().create(name) + def remove_user_track(self, playlist: Playlist, + track: tracks.Track) -> bool: + """Remove a track from the User Tracks table.""" + return self.sql("""DELETE FROM user_tracks + WHERE propertyid=? AND trackid=?""", + playlist.propertyid, track.trackid).rowcount == 1 + def rename(self, playlist: Playlist, new_name: str) -> bool: """Rename a Playlist.""" if len(new_name := new_name.strip()) > 0: diff --git a/tests/db/test_playlists.py b/tests/db/test_playlists.py index 08bf7ee..fa58164 100644 --- a/tests/db/test_playlists.py +++ b/tests/db/test_playlists.py @@ -61,12 +61,14 @@ class TestPlaylistTable(tests.util.TestCase): self.track = self.sql.tracks.create(self.library, pathlib.Path("/a/b/c.ogg"), - self.medium, self.year) + self.medium, self.year, number=1) def test_init(self): """Test that the playlist model is configured correctly.""" self.assertIsInstance(self.table, emmental.db.playlist.Table) self.assertFalse(self.table.autodelete) + self.assertFalse(self.table.system_tracks) + self.assertFalse(self.table.have_collection_tracks) self.assertEqual(len(self.table), 0) self.assertIsNone(self.table.collection) @@ -77,6 +79,20 @@ class TestPlaylistTable(tests.util.TestCase): self.assertIsNone(self.table.queued) self.assertIsNone(self.table.unplayed) + def test_add_track(self): + """Test adding tracks to user Playlists.""" + playlist = self.table.create("Test Playlist") + + playlist.add_track(self.track) + self.assertTrue(playlist.has_track(self.track)) + + rows = self.sql("SELECT trackid FROM user_tracks WHERE propertyid=?", + playlist.propertyid).fetchall() + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]["trackid"], self.track.trackid) + + self.assertFalse(playlist.add_track(self.track)) + def test_construct(self): """Test constructing a playlist.""" playlist = self.table.construct(playlistid=1, propertyid=1, @@ -99,6 +115,7 @@ class TestPlaylistTable(tests.util.TestCase): self.assertEqual(playlist.sort_order, "albumartist, album, mediumno, number") self.assertIsNone(playlist.image) + self.assertTrue(playlist.tracks_movable) cur = self.sql("SELECT COUNT(name) FROM playlists") self.assertEqual(cur.fetchone()["COUNT(name)"], 1) @@ -141,6 +158,33 @@ class TestPlaylistTable(tests.util.TestCase): self.table.filter("playlist*", now=True) self.assertSetEqual(self.table.get_filter().keys, {1, 2}) + def test_get_trackids(self): + """Test loading playlist tracks from the database.""" + playlist = self.table.create("Test Playlist") + self.assertSetEqual(self.table.get_trackids(playlist), set()) + + playlist.add_track(self.track) + self.assertSetEqual(self.table.get_trackids(playlist), + {self.track.trackid}) + + def test_get_track_order(self): + """Test getting the user track order for a playlist.""" + track2 = self.sql.tracks.create(self.library, + pathlib.Path("/a/b/d.ogg"), + self.medium, self.year) + playlist = self.table.create("Test Playlist") + playlist.add_track(self.track) + playlist.add_track(track2) + playlist.sort_order = "user" + + self.assertDictEqual(self.table.get_track_order(playlist), + {self.track.trackid: 0, track2.trackid: 1}) + + self.sql("UPDATE user_tracks SET position=? WHERE trackid=?", + 3, track2.trackid) + self.assertDictEqual(self.table.get_track_order(playlist), + {self.track.trackid: 1, track2.trackid: 0}) + def test_load(self): """Test loading playlists from the database.""" self.table.create("Playlist 1").image = tests.util.COVER_JPG @@ -162,6 +206,66 @@ class TestPlaylistTable(tests.util.TestCase): self.assertEqual(self.table.lookup("test playlist"), playlist) self.assertIsNone(self.table.lookup("No Playlist")) + def test_move_track_down(self): + """Test moving a track down in the sort order.""" + track2 = self.sql.tracks.create(self.library, + pathlib.Path("/a/b/d.ogg"), + self.medium, self.year, number=2) + track3 = self.sql.tracks.create(self.library, + pathlib.Path("/a/b/e.ogg"), + self.medium, self.year, number=3) + + playlist = self.table.create("Test Playlist") + for track in [self.track, track2, track3]: + playlist.add_track(track) + playlist.sort_order = "number" + + self.assertFalse(self.table.move_track_down(playlist, track3)) + self.assertEqual(playlist.sort_order, "number") + + self.assertTrue(self.table.move_track_down(playlist, self.track)) + self.assertEqual(playlist.sort_order, "user") + self.assertDictEqual(self.table.get_track_order(playlist), + {track2.trackid: 0, + self.track.trackid: 1, + track3.trackid: 2}) + + def test_move_track_up(self): + """Test moving a track up in the sort order.""" + track2 = self.sql.tracks.create(self.library, + pathlib.Path("/a/b/d.ogg"), + self.medium, self.year, number=2) + track3 = self.sql.tracks.create(self.library, + pathlib.Path("/a/b/e.ogg"), + self.medium, self.year, number=3) + + playlist = self.table.create("Test Playlist") + for track in [self.track, track2, track3]: + playlist.add_track(track) + playlist.sort_order = "number" + + self.assertFalse(self.table.move_track_up(playlist, self.track)) + self.assertEqual(playlist.sort_order, "number") + + self.assertTrue(self.table.move_track_up(playlist, track3)) + self.assertEqual(playlist.sort_order, "user") + self.assertDictEqual(self.table.get_track_order(playlist), + {self.track.trackid: 0, + track3.trackid: 1, + track2.trackid: 2}) + + def test_remove_track(self): + """Test removing tracks from user Playlists.""" + playlist = self.table.create("Test Playlist") + playlist.add_track(self.track) + + playlist.remove_track(self.track) + self.assertFalse(playlist.has_track(self.track)) + + rows = self.sql("SELECT trackid FROM user_tracks WHERE propertyid=?", + playlist.propertyid).fetchall() + self.assertEqual(len(rows), 0) + def test_rename(self): """Test renaming a playlist.""" playlist = self.table.create("Test Playlist") @@ -213,6 +317,15 @@ class TestSystemPlaylists(tests.util.TestCase): self.table = self.sql.playlists self.table.load(now=True) + self.library = self.sql.libraries.create(pathlib.Path("/a/b")) + self.album = self.sql.albums.create("Test Album", "Artist", "2023-04") + self.medium = self.sql.media.create(self.album, "", number=1) + self.year = self.sql.years.create(2023) + + self.track = self.sql.tracks.create(self.library, + pathlib.Path("/a/b/1.ogg"), + self.medium, self.year) + def test_collection(self): """Test the Collection playlist.""" self.assertIsInstance(self.table.collection, @@ -244,6 +357,38 @@ class TestSystemPlaylists(tests.util.TestCase): self.table.collection.loop = "None" self.assertEqual(self.table.collection.loop, "Playlist") + def test_collection_tracks(self): + """Test the Collection playlist track functions.""" + for (enabled, deleting) in [(False, False), (False, True), + (True, True), (True, False)]: + with self.subTest(enabled=enabled, deleting=deleting): + self.library.enabled = enabled + self.library.deleting = deleting + + self.table.collection.add_track(self.track) + self.assertEqual(self.table.collection.has_track(self.track), + enabled and not deleting) + self.assertEqual(self.table.have_collection_tracks, + enabled and not deleting) + + self.table.collection.remove_track(self.track) + self.assertFalse(self.table.collection.has_track(self.track)) + + self.table.collection.reload_tracks() + self.assertTrue(self.table.collection.has_track(self.track)) + + self.library.enabled = False + self.table.queue.complete() + self.assertFalse(self.table.collection.has_track(self.track)) + + self.library.enabled = True + self.table.queue.complete() + self.assertTrue(self.table.collection.has_track(self.track)) + + self.library.deleting = True + self.table.collection.reload_tracks() + self.assertFalse(self.table.collection.has_track(self.track)) + def test_favorites(self): """Test the favorite tracks playlist.""" self.assertIsInstance(self.table.favorites, @@ -263,6 +408,24 @@ class TestSystemPlaylists(tests.util.TestCase): self.assertEqual(self.table.lookup("Favorite Tracks"), self.table.favorites) + def test_favorite_tracks(self): + """Test the Favorite Tracks track functions.""" + self.table.favorites.add_track(self.track) + self.assertTrue(self.table.favorites.has_track(self.track)) + self.assertTrue(self.track.favorite) + + self.library.deleting = True + self.table.favorites.reload_tracks() + self.assertFalse(self.table.favorites.has_track(self.track)) + + self.library.deleting = False + self.table.favorites.reload_tracks() + self.assertTrue(self.table.favorites.has_track(self.track)) + + self.table.favorites.remove_track(self.track) + self.assertFalse(self.table.favorites.has_track(self.track)) + self.assertFalse(self.track.favorite) + def test_most_played(self): """Test the most-played tracks playlist.""" self.assertIsInstance(self.table.most_played, @@ -281,6 +444,35 @@ class TestSystemPlaylists(tests.util.TestCase): self.assertEqual(self.table.lookup("Most Played Tracks"), self.table.most_played) + def test_most_played_tracks(self): + """Test the Most Played Tracks track functions.""" + track2 = self.sql.tracks.create(self.library, + pathlib.Path("/a/b/2.ogg"), + self.medium, self.year) + self.sql("UPDATE tracks SET playcount=2 WHERE trackid=?", + self.track.trackid) + self.sql("UPDATE tracks SET playcount=1 WHERE trackid=?", + track2.trackid) + + self.table.most_played.add_track(self.track) + self.assertFalse(self.table.most_played.has_track(self.track)) + + self.table.most_played.tracks.add_track(self.track) + self.table.most_played.remove_track(self.track) + self.assertFalse(self.table.most_played.has_track(self.track)) + + self.table.most_played.reload_tracks() + self.assertFalse(self.table.most_played.has_track(self.track)) + + self.sql("UPDATE tracks SET playcount=5 WHERE trackid=?", + self.track.trackid) + self.table.most_played.reload_tracks() + self.assertTrue(self.table.most_played.has_track(self.track)) + + self.library.deleting = True + self.table.most_played.reload_tracks() + self.assertFalse(self.table.most_played.has_track(self.track)) + def test_new_tracks(self): """Test the new tracks playlist.""" self.assertIsInstance(self.table.new_tracks, @@ -299,6 +491,22 @@ class TestSystemPlaylists(tests.util.TestCase): self.assertEqual(self.table.lookup("New Tracks"), self.table.new_tracks) + def test_new_tracks_tracks(self): + """Test the New Tracks track functions.""" + self.table.new_tracks.add_track(self.track) + self.assertTrue(self.table.new_tracks.has_track(self.track)) + + self.library.deleting = True + self.table.new_tracks.reload_tracks() + self.assertFalse(self.table.new_tracks.has_track(self.track)) + + self.library.deleting = False + self.table.new_tracks.reload_tracks() + self.assertTrue(self.table.new_tracks.has_track(self.track)) + + self.table.new_tracks.remove_track(self.track) + self.assertFalse(self.table.new_tracks.has_track(self.track)) + def test_previous(self): """Test the previous tracks playlist.""" self.assertIsInstance(self.table.previous, @@ -349,6 +557,37 @@ class TestSystemPlaylists(tests.util.TestCase): self.table.previous.sort_order = "trackid" self.assertEqual(self.table.previous.sort_order, "laststarted DESC") + def test_previous_tracks(self): + """Test the Previous Tracks track functions.""" + self.table.previous.add_track(self.track) + self.assertTrue(self.table.previous.has_track(self.track)) + rows = self.sql("SELECT trackid FROM system_tracks WHERE propertyid=?", + self.table.previous.propertyid).fetchall() + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]["trackid"], self.track.trackid) + + self.library.deleting = True + self.table.previous.reload_tracks() + self.assertFalse(self.table.previous.has_track(self.track)) + + self.library.deleting = False + self.table.previous.reload_tracks() + self.assertTrue(self.table.previous.has_track(self.track)) + + self.table.previous.remove_track(self.track) + self.assertFalse(self.table.previous.has_track(self.track)) + rows = self.sql("SELECT trackid FROM system_tracks WHERE propertyid=?", + self.table.previous.propertyid).fetchall() + self.assertEqual(len(rows), 0) + + def test_previous_tracks_reset(self): + """Test that the Previous Tracks are reset during startup.""" + self.table.previous.add_track(self.track) + self.table.load(now=True) + rows = self.sql("SELECT trackid FROM system_tracks WHERE propertyid=?", + self.table.previous.propertyid).fetchall() + self.assertEqual(len(rows), 0) + def test_queued(self): """Test the queued tracks playlist.""" self.assertIsInstance(self.table.queued, @@ -368,6 +607,29 @@ class TestSystemPlaylists(tests.util.TestCase): self.assertEqual(self.table.lookup("Queued Tracks"), self.table.queued) + def test_queued_tracks(self): + """Test the Queued Tracks track functions.""" + self.table.queued.add_track(self.track) + self.assertTrue(self.table.queued.has_track(self.track)) + rows = self.sql("SELECT trackid FROM user_tracks WHERE propertyid=?", + self.table.queued.propertyid).fetchall() + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]["trackid"], self.track.trackid) + + self.library.deleting = True + self.table.queued.reload_tracks() + self.assertFalse(self.table.queued.has_track(self.track)) + + self.library.deleting = False + self.table.queued.reload_tracks() + self.assertTrue(self.table.queued.has_track(self.track)) + + self.table.queued.remove_track(self.track) + self.assertFalse(self.table.queued.has_track(self.track)) + rows = self.sql("SELECT trackid FROM user_tracks WHERE propertyid=?", + self.table.queued.propertyid).fetchall() + self.assertEqual(len(rows), 0) + def test_unplayed(self): """Test the unplayed tracks playlist.""" self.assertIsInstance(self.table.unplayed, @@ -385,3 +647,24 @@ class TestSystemPlaylists(tests.util.TestCase): self.assertEqual(self.table.lookup("Unplayed Tracks"), self.table.unplayed) + + def test_unplayed_tracks(self): + """Test the Unplayed Tracks track functions.""" + self.table.unplayed.add_track(self.track) + self.assertTrue(self.table.unplayed.has_track(self.track)) + + self.library.deleting = True + self.table.unplayed.reload_tracks() + self.assertFalse(self.table.unplayed.has_track(self.track)) + + self.library.deleting = False + self.table.unplayed.reload_tracks() + self.assertTrue(self.table.unplayed.has_track(self.track)) + + self.table.unplayed.remove_track(self.track) + self.assertFalse(self.table.unplayed.has_track(self.track)) + + self.sql("UPDATE tracks SET playcount=1 WHERE trackid=?", + self.track.trackid) + self.table.unplayed.reload_tracks() + self.assertFalse(self.table.unplayed.has_track(self.track))