db: Give System Playlists knowledge about their Tracks

I need to do something slightly different for each Playlist.

* Collection: I load tracks from the collection_view, which filters
      tracks to those where the library is enabled but not deleting.
* Favorite Tracks: I load tracks from the favorites_view, which filters
      tracks based on the tracks.favorite and library.deleting column.
* Most Played Tracks: I load tracks with a playcount greater than the
      average playcount of all tracks (rounded up to the nearest integer).
* New Tracks: I load tracks that have been added within the last week.
* Previous Tracks: I load tracks that have been played since startup
      using the system_tracks table.  I take care to clear these entries
      in the table during startup.
* Queued Tracks: Load tracks from the user_tracks table.
* Unplayed Tracks: I load tracks with a playcount equal to 0 and remove
      when they are played.
* User-Defined Playlists: Load tracks from the track_playlist_link
      table.

Additionally, I implement move_track_up() and move_track_down() support
for user playlists and queued tracks.

Finally, I update the have-next-track property to take into account if
the Collection has tracks too.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2022-10-27 10:03:50 -04:00
parent 99496ca8bf
commit 6762916899
4 changed files with 443 additions and 3 deletions

View File

@ -463,6 +463,17 @@ CREATE TABLE system_tracks (
UNIQUE(trackid, propertyid)
);
CREATE TABLE user_tracks (
trackid INTEGER REFERENCES tracks (trackid)
ON DELETE CASCADE
ON DELETE CASCADE,
propertyid INTEGER REFERENCES playlist_properties (propertyid)
ON DELETE CASCADE
ON UPDATE CASCADE,
position INTEGER,
UNIQUE(trackid, propertyid)
);
CREATE VIEW system_tracks_view AS
SELECT trackid, system_tracks.propertyid
FROM system_tracks
@ -470,6 +481,41 @@ CREATE VIEW system_tracks_view AS
JOIN libraries USING (libraryid)
WHERE libraries.deleting = FALSE;
CREATE VIEW user_tracks_view AS
SELECT trackid, user_tracks.propertyid, user_tracks.position
FROM user_tracks
JOIN tracks USING (trackid)
JOIN libraries USING (libraryid)
WHERE libraries.deleting = FALSE;
CREATE VIEW collection_view AS
SELECT tracks.trackid FROM tracks
JOIN libraries USING (libraryid)
WHERE libraries.enabled = TRUE AND libraries.deleting = FALSE;
CREATE VIEW favorite_view AS
SELECT tracks.trackid FROM tracks
JOIN libraries USING (libraryid)
WHERE tracks.favorite = TRUE AND libraries.deleting = FALSE;
CREATE VIEW most_played_view AS
SELECT tracks.trackid FROM tracks
JOIN libraries USING (libraryid)
WHERE tracks.playcount > (SELECT CEIL(AVG(playcount))
FROM tracks WHERE playcount>0)
AND libraries.deleting = FALSE;
CREATE VIEW new_tracks_view AS
SELECT tracks.trackid FROM tracks
JOIN libraries USING (libraryid)
WHERE tracks.added > DATE('now', 'localtime', '-7 days')
AND libraries.deleting = FALSE;
CREATE VIEW unplayed_tracks_view AS
SELECT tracks.trackid FROM tracks
JOIN libraries USING (libraryid)
WHERE tracks.playcount == 0 AND libraries.deleting = FALSE;
/****************************************************
* *

View File

@ -130,6 +130,8 @@ class Table(playlist.Table):
def do_sql_update(self, library: Library, column: str, newval) -> bool:
"""Update a Library playlist."""
if column == "enabled" and self.sql.playlists.collection:
self.sql.playlists.collection.reload_tracks(idle=True)
return self.sql(f"UPDATE libraries SET {column}=? WHERE rowid=?",
newval, library.libraryid)

View File

@ -3,6 +3,7 @@
import sqlite3
from gi.repository import GObject
from . import playlist
from . import tracks
class Playlist(playlist.Playlist):
@ -16,6 +17,10 @@ class Playlist(playlist.Playlist):
match (self.name, column, self.get_property(column)):
case ("Collection", "loop", "None"):
self.loop = "Playlist"
case ("Collection", "n-tracks", 0):
self.table.have_collection_tracks = False
case ("Collection", "n-tracks", _):
self.table.have_collection_tracks = True
case ("Previous Tracks", "loop", "Playlist") | \
("Previous Tracks", "loop", "Track"):
self.loop = "None"
@ -47,9 +52,31 @@ class Table(playlist.Table):
queued = GObject.Property(type=Playlist)
unplayed = GObject.Property(type=Playlist)
have_collection_tracks = GObject.Property(type=bool, default=False)
def __init__(self, sql: GObject.TYPE_PYOBJECT, **kwargs):
"""Initialize the Playlists Table."""
super().__init__(sql=sql, **kwargs)
super().__init__(sql=sql, system_tracks=False, **kwargs)
def __move_user_trackid(self, playlist: Playlist, trackid: int,
*, offset: int) -> bool:
order = self.get_track_order(playlist)
tracks = sorted(playlist.tracks.trackids, key=order.get)
start = tracks.index(trackid)
new = start + offset
if not (0 <= new < len(tracks)):
return False
tracks[start] = tracks[new]
tracks[new] = trackid
# Note: We write out all trackids so we don't have to update during
# do_add_track() and do_remove_track()
args = [(i, playlist.propertyid, t) for (i, t) in enumerate(tracks)]
self.sql.executemany("""UPDATE user_tracks SET position=?
WHERE propertyid=? AND trackid=?""", *args)
return True
def do_construct(self, **kwargs) -> Playlist:
"""Construct a new playlist."""
@ -60,7 +87,10 @@ class Table(playlist.Table):
self.favorites.user_tracks = True
case "Most Played Tracks": self.most_played = plist
case "New Tracks": self.new_tracks = plist
case "Previous Tracks": self.previous = plist
case "Previous Tracks":
self.previous = plist
self.sql("DELETE FROM system_tracks WHERE propertyid=?",
self.previous.propertyid)
case "Queued Tracks":
self.queued = plist
self.queued.user_tracks = True
@ -71,6 +101,54 @@ class Table(playlist.Table):
plist.tracks_movable = True
return plist
def do_add_track(self, playlist: Playlist, track: tracks.Track) -> bool:
"""Add a Track to the requested Playlist."""
match playlist:
case self.collection: return track.get_library().enabled
case self.most_played: view = "most_played_view"
case self.new_tracks: view = "new_tracks_view"
case self.favorites:
track.update_properties(favorite=True)
return True
case self.previous:
self.add_system_track(playlist, track)
return True
case self.unplayed: return track.playcount == 0
case _: return self.add_user_track(playlist, track)
return self.sql(f"SELECT ? IN {view}", track.trackid).fetchone()[0]
def do_get_user_track_order(self, playlist: Playlist) -> dict[int, int]:
"""Get the user-configured sort order for a playlist."""
cur = self.sql("""SELECT trackid FROM user_tracks WHERE propertyid=?
ORDER BY position NULLS LAST, rowid""",
playlist.propertyid)
return {row["trackid"]: i for (i, row) in enumerate(cur.fetchall())}
def do_move_track_down(self, playlist: Playlist,
track: tracks.Track) -> bool:
"""Move a track down in the user sort order."""
return self.__move_user_trackid(playlist, track.trackid, offset=1)
def do_move_track_up(self, playlist: Playlist,
track: tracks.Track) -> bool:
"""Move a track up in the user sort order."""
return self.__move_user_trackid(playlist, track.trackid, offset=-1)
def do_remove_track(self, playlist: Playlist, track: tracks.Track) -> bool:
"""Remove a Track from the requested Playlist."""
match playlist:
case self.collection: return True
case self.most_played: return True
case self.new_tracks: return True
case self.unplayed: return True
case self.favorites:
track.update_properties(favorite=False)
return True
case self.previous:
return self.remove_system_track(playlist, track)
case _: return self.remove_user_track(playlist, track)
def do_sql_delete(self, playlist: Playlist) -> sqlite3.Cursor:
"""Delete a playlist."""
return self.sql("DELETE FROM playlists WHERE playlistid=?",
@ -91,6 +169,19 @@ class Table(playlist.Table):
"""Load playlists from the database."""
return self.sql("SELECT * FROM playlists_view")
def do_sql_select_trackids(self, playlist: Playlist) -> sqlite3.Cursor:
"""Load Tracks from the database."""
match playlist:
case self.collection: view = "collection_view"
case self.favorites: view = "favorite_view"
case self.most_played: view = "most_played_view"
case self.new_tracks: view = "new_tracks_view"
case self.unplayed: view = "unplayed_tracks_view"
case self.previous: return self.get_sql_system_trackids(playlist)
case _: return self.get_sql_user_trackids(playlist)
return self.sql(f"SELECT trackid FROM {view}")
def do_sql_select_one(self, name: str) -> sqlite3.Cursor:
"""Look up a playlist by name."""
return self.sql("SELECT playlistid FROM playlists WHERE name=?", name)
@ -101,11 +192,29 @@ class Table(playlist.Table):
return self.sql(f"UPDATE playlists SET {column}=? WHERE playlistid=?",
newval, playlist.playlistid)
def add_user_track(self, playlist: Playlist, track: tracks.Track) -> bool:
"""Add a Track to the User Tracks table."""
cur = self.sql("""INSERT INTO user_tracks (propertyid, trackid)
VALUES (?, ?)""", playlist.propertyid, track.trackid)
return cur and cur.rowcount == 1
def get_sql_user_trackids(self, playlist: Playlist) -> sqlite3.Cursor:
"""Load user Tracks from the database."""
return self.sql("""SELECT trackid FROM user_tracks_view
WHERE propertyid=?""", playlist.propertyid)
def create(self, name: str) -> Playlist:
"""Create a new Playlist."""
if len(name := name.strip()) > 0:
return super().create(name)
def remove_user_track(self, playlist: Playlist,
track: tracks.Track) -> bool:
"""Remove a track from the User Tracks table."""
return self.sql("""DELETE FROM user_tracks
WHERE propertyid=? AND trackid=?""",
playlist.propertyid, track.trackid).rowcount == 1
def rename(self, playlist: Playlist, new_name: str) -> bool:
"""Rename a Playlist."""
if len(new_name := new_name.strip()) > 0:

View File

@ -61,12 +61,14 @@ class TestPlaylistTable(tests.util.TestCase):
self.track = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/c.ogg"),
self.medium, self.year)
self.medium, self.year, number=1)
def test_init(self):
"""Test that the playlist model is configured correctly."""
self.assertIsInstance(self.table, emmental.db.playlist.Table)
self.assertFalse(self.table.autodelete)
self.assertFalse(self.table.system_tracks)
self.assertFalse(self.table.have_collection_tracks)
self.assertEqual(len(self.table), 0)
self.assertIsNone(self.table.collection)
@ -77,6 +79,20 @@ class TestPlaylistTable(tests.util.TestCase):
self.assertIsNone(self.table.queued)
self.assertIsNone(self.table.unplayed)
def test_add_track(self):
"""Test adding tracks to user Playlists."""
playlist = self.table.create("Test Playlist")
playlist.add_track(self.track)
self.assertTrue(playlist.has_track(self.track))
rows = self.sql("SELECT trackid FROM user_tracks WHERE propertyid=?",
playlist.propertyid).fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["trackid"], self.track.trackid)
self.assertFalse(playlist.add_track(self.track))
def test_construct(self):
"""Test constructing a playlist."""
playlist = self.table.construct(playlistid=1, propertyid=1,
@ -99,6 +115,7 @@ class TestPlaylistTable(tests.util.TestCase):
self.assertEqual(playlist.sort_order,
"albumartist, album, mediumno, number")
self.assertIsNone(playlist.image)
self.assertTrue(playlist.tracks_movable)
cur = self.sql("SELECT COUNT(name) FROM playlists")
self.assertEqual(cur.fetchone()["COUNT(name)"], 1)
@ -141,6 +158,33 @@ class TestPlaylistTable(tests.util.TestCase):
self.table.filter("playlist*", now=True)
self.assertSetEqual(self.table.get_filter().keys, {1, 2})
def test_get_trackids(self):
"""Test loading playlist tracks from the database."""
playlist = self.table.create("Test Playlist")
self.assertSetEqual(self.table.get_trackids(playlist), set())
playlist.add_track(self.track)
self.assertSetEqual(self.table.get_trackids(playlist),
{self.track.trackid})
def test_get_track_order(self):
"""Test getting the user track order for a playlist."""
track2 = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/d.ogg"),
self.medium, self.year)
playlist = self.table.create("Test Playlist")
playlist.add_track(self.track)
playlist.add_track(track2)
playlist.sort_order = "user"
self.assertDictEqual(self.table.get_track_order(playlist),
{self.track.trackid: 0, track2.trackid: 1})
self.sql("UPDATE user_tracks SET position=? WHERE trackid=?",
3, track2.trackid)
self.assertDictEqual(self.table.get_track_order(playlist),
{self.track.trackid: 1, track2.trackid: 0})
def test_load(self):
"""Test loading playlists from the database."""
self.table.create("Playlist 1").image = tests.util.COVER_JPG
@ -162,6 +206,66 @@ class TestPlaylistTable(tests.util.TestCase):
self.assertEqual(self.table.lookup("test playlist"), playlist)
self.assertIsNone(self.table.lookup("No Playlist"))
def test_move_track_down(self):
"""Test moving a track down in the sort order."""
track2 = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/d.ogg"),
self.medium, self.year, number=2)
track3 = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/e.ogg"),
self.medium, self.year, number=3)
playlist = self.table.create("Test Playlist")
for track in [self.track, track2, track3]:
playlist.add_track(track)
playlist.sort_order = "number"
self.assertFalse(self.table.move_track_down(playlist, track3))
self.assertEqual(playlist.sort_order, "number")
self.assertTrue(self.table.move_track_down(playlist, self.track))
self.assertEqual(playlist.sort_order, "user")
self.assertDictEqual(self.table.get_track_order(playlist),
{track2.trackid: 0,
self.track.trackid: 1,
track3.trackid: 2})
def test_move_track_up(self):
"""Test moving a track up in the sort order."""
track2 = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/d.ogg"),
self.medium, self.year, number=2)
track3 = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/e.ogg"),
self.medium, self.year, number=3)
playlist = self.table.create("Test Playlist")
for track in [self.track, track2, track3]:
playlist.add_track(track)
playlist.sort_order = "number"
self.assertFalse(self.table.move_track_up(playlist, self.track))
self.assertEqual(playlist.sort_order, "number")
self.assertTrue(self.table.move_track_up(playlist, track3))
self.assertEqual(playlist.sort_order, "user")
self.assertDictEqual(self.table.get_track_order(playlist),
{self.track.trackid: 0,
track3.trackid: 1,
track2.trackid: 2})
def test_remove_track(self):
"""Test removing tracks from user Playlists."""
playlist = self.table.create("Test Playlist")
playlist.add_track(self.track)
playlist.remove_track(self.track)
self.assertFalse(playlist.has_track(self.track))
rows = self.sql("SELECT trackid FROM user_tracks WHERE propertyid=?",
playlist.propertyid).fetchall()
self.assertEqual(len(rows), 0)
def test_rename(self):
"""Test renaming a playlist."""
playlist = self.table.create("Test Playlist")
@ -213,6 +317,15 @@ class TestSystemPlaylists(tests.util.TestCase):
self.table = self.sql.playlists
self.table.load(now=True)
self.library = self.sql.libraries.create(pathlib.Path("/a/b"))
self.album = self.sql.albums.create("Test Album", "Artist", "2023-04")
self.medium = self.sql.media.create(self.album, "", number=1)
self.year = self.sql.years.create(2023)
self.track = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/1.ogg"),
self.medium, self.year)
def test_collection(self):
"""Test the Collection playlist."""
self.assertIsInstance(self.table.collection,
@ -244,6 +357,38 @@ class TestSystemPlaylists(tests.util.TestCase):
self.table.collection.loop = "None"
self.assertEqual(self.table.collection.loop, "Playlist")
def test_collection_tracks(self):
"""Test the Collection playlist track functions."""
for (enabled, deleting) in [(False, False), (False, True),
(True, True), (True, False)]:
with self.subTest(enabled=enabled, deleting=deleting):
self.library.enabled = enabled
self.library.deleting = deleting
self.table.collection.add_track(self.track)
self.assertEqual(self.table.collection.has_track(self.track),
enabled and not deleting)
self.assertEqual(self.table.have_collection_tracks,
enabled and not deleting)
self.table.collection.remove_track(self.track)
self.assertFalse(self.table.collection.has_track(self.track))
self.table.collection.reload_tracks()
self.assertTrue(self.table.collection.has_track(self.track))
self.library.enabled = False
self.table.queue.complete()
self.assertFalse(self.table.collection.has_track(self.track))
self.library.enabled = True
self.table.queue.complete()
self.assertTrue(self.table.collection.has_track(self.track))
self.library.deleting = True
self.table.collection.reload_tracks()
self.assertFalse(self.table.collection.has_track(self.track))
def test_favorites(self):
"""Test the favorite tracks playlist."""
self.assertIsInstance(self.table.favorites,
@ -263,6 +408,24 @@ class TestSystemPlaylists(tests.util.TestCase):
self.assertEqual(self.table.lookup("Favorite Tracks"),
self.table.favorites)
def test_favorite_tracks(self):
"""Test the Favorite Tracks track functions."""
self.table.favorites.add_track(self.track)
self.assertTrue(self.table.favorites.has_track(self.track))
self.assertTrue(self.track.favorite)
self.library.deleting = True
self.table.favorites.reload_tracks()
self.assertFalse(self.table.favorites.has_track(self.track))
self.library.deleting = False
self.table.favorites.reload_tracks()
self.assertTrue(self.table.favorites.has_track(self.track))
self.table.favorites.remove_track(self.track)
self.assertFalse(self.table.favorites.has_track(self.track))
self.assertFalse(self.track.favorite)
def test_most_played(self):
"""Test the most-played tracks playlist."""
self.assertIsInstance(self.table.most_played,
@ -281,6 +444,35 @@ class TestSystemPlaylists(tests.util.TestCase):
self.assertEqual(self.table.lookup("Most Played Tracks"),
self.table.most_played)
def test_most_played_tracks(self):
"""Test the Most Played Tracks track functions."""
track2 = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/2.ogg"),
self.medium, self.year)
self.sql("UPDATE tracks SET playcount=2 WHERE trackid=?",
self.track.trackid)
self.sql("UPDATE tracks SET playcount=1 WHERE trackid=?",
track2.trackid)
self.table.most_played.add_track(self.track)
self.assertFalse(self.table.most_played.has_track(self.track))
self.table.most_played.tracks.add_track(self.track)
self.table.most_played.remove_track(self.track)
self.assertFalse(self.table.most_played.has_track(self.track))
self.table.most_played.reload_tracks()
self.assertFalse(self.table.most_played.has_track(self.track))
self.sql("UPDATE tracks SET playcount=5 WHERE trackid=?",
self.track.trackid)
self.table.most_played.reload_tracks()
self.assertTrue(self.table.most_played.has_track(self.track))
self.library.deleting = True
self.table.most_played.reload_tracks()
self.assertFalse(self.table.most_played.has_track(self.track))
def test_new_tracks(self):
"""Test the new tracks playlist."""
self.assertIsInstance(self.table.new_tracks,
@ -299,6 +491,22 @@ class TestSystemPlaylists(tests.util.TestCase):
self.assertEqual(self.table.lookup("New Tracks"),
self.table.new_tracks)
def test_new_tracks_tracks(self):
"""Test the New Tracks track functions."""
self.table.new_tracks.add_track(self.track)
self.assertTrue(self.table.new_tracks.has_track(self.track))
self.library.deleting = True
self.table.new_tracks.reload_tracks()
self.assertFalse(self.table.new_tracks.has_track(self.track))
self.library.deleting = False
self.table.new_tracks.reload_tracks()
self.assertTrue(self.table.new_tracks.has_track(self.track))
self.table.new_tracks.remove_track(self.track)
self.assertFalse(self.table.new_tracks.has_track(self.track))
def test_previous(self):
"""Test the previous tracks playlist."""
self.assertIsInstance(self.table.previous,
@ -349,6 +557,37 @@ class TestSystemPlaylists(tests.util.TestCase):
self.table.previous.sort_order = "trackid"
self.assertEqual(self.table.previous.sort_order, "laststarted DESC")
def test_previous_tracks(self):
"""Test the Previous Tracks track functions."""
self.table.previous.add_track(self.track)
self.assertTrue(self.table.previous.has_track(self.track))
rows = self.sql("SELECT trackid FROM system_tracks WHERE propertyid=?",
self.table.previous.propertyid).fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["trackid"], self.track.trackid)
self.library.deleting = True
self.table.previous.reload_tracks()
self.assertFalse(self.table.previous.has_track(self.track))
self.library.deleting = False
self.table.previous.reload_tracks()
self.assertTrue(self.table.previous.has_track(self.track))
self.table.previous.remove_track(self.track)
self.assertFalse(self.table.previous.has_track(self.track))
rows = self.sql("SELECT trackid FROM system_tracks WHERE propertyid=?",
self.table.previous.propertyid).fetchall()
self.assertEqual(len(rows), 0)
def test_previous_tracks_reset(self):
"""Test that the Previous Tracks are reset during startup."""
self.table.previous.add_track(self.track)
self.table.load(now=True)
rows = self.sql("SELECT trackid FROM system_tracks WHERE propertyid=?",
self.table.previous.propertyid).fetchall()
self.assertEqual(len(rows), 0)
def test_queued(self):
"""Test the queued tracks playlist."""
self.assertIsInstance(self.table.queued,
@ -368,6 +607,29 @@ class TestSystemPlaylists(tests.util.TestCase):
self.assertEqual(self.table.lookup("Queued Tracks"),
self.table.queued)
def test_queued_tracks(self):
"""Test the Queued Tracks track functions."""
self.table.queued.add_track(self.track)
self.assertTrue(self.table.queued.has_track(self.track))
rows = self.sql("SELECT trackid FROM user_tracks WHERE propertyid=?",
self.table.queued.propertyid).fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["trackid"], self.track.trackid)
self.library.deleting = True
self.table.queued.reload_tracks()
self.assertFalse(self.table.queued.has_track(self.track))
self.library.deleting = False
self.table.queued.reload_tracks()
self.assertTrue(self.table.queued.has_track(self.track))
self.table.queued.remove_track(self.track)
self.assertFalse(self.table.queued.has_track(self.track))
rows = self.sql("SELECT trackid FROM user_tracks WHERE propertyid=?",
self.table.queued.propertyid).fetchall()
self.assertEqual(len(rows), 0)
def test_unplayed(self):
"""Test the unplayed tracks playlist."""
self.assertIsInstance(self.table.unplayed,
@ -385,3 +647,24 @@ class TestSystemPlaylists(tests.util.TestCase):
self.assertEqual(self.table.lookup("Unplayed Tracks"),
self.table.unplayed)
def test_unplayed_tracks(self):
"""Test the Unplayed Tracks track functions."""
self.table.unplayed.add_track(self.track)
self.assertTrue(self.table.unplayed.has_track(self.track))
self.library.deleting = True
self.table.unplayed.reload_tracks()
self.assertFalse(self.table.unplayed.has_track(self.track))
self.library.deleting = False
self.table.unplayed.reload_tracks()
self.assertTrue(self.table.unplayed.has_track(self.track))
self.table.unplayed.remove_track(self.track)
self.assertFalse(self.table.unplayed.has_track(self.track))
self.sql("UPDATE tracks SET playcount=1 WHERE trackid=?",
self.track.trackid)
self.table.unplayed.reload_tracks()
self.assertFalse(self.table.unplayed.has_track(self.track))