db: Add Track support to Playlist Tables

I implement add_track(), get_trackids(), and remove_track() functions that
either modify the 'system_playlist_tracks' table or call a virtual function
depending on the value of the 'system-tracks' property.

I also add the "autodelete" property. When set to True, Playlists will
be deleted when they have 0 tracks remaining.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2022-10-03 14:40:32 -04:00
parent 1b9458c278
commit 6eec4dbfc3
3 changed files with 231 additions and 1 deletions

View File

@ -395,6 +395,30 @@ CREATE TRIGGER tracks_active_trigger
END;
/*********************************************
* *
* Track <--> Playlist Linking *
* *
*********************************************/
CREATE TABLE system_tracks (
trackid INTEGER REFERENCES tracks (trackid)
ON DELETE CASCADE
ON UPDATE CASCADE,
propertyid INTEGER REFERENCES playlist_properties (propertyid)
ON DELETE CASCADE
ON UPDATE CASCADE,
UNIQUE(trackid, propertyid)
);
CREATE VIEW system_tracks_view AS
SELECT trackid, system_tracks.propertyid
FROM system_tracks
JOIN tracks USING (trackid)
JOIN libraries USING (libraryid)
WHERE libraries.deleting = FALSE;
/****************************************************
* *
* Data saved when Tracks are deleted *

View File

@ -1,5 +1,6 @@
# Copyright 2022 (c) Anna Schumaker
"""A customized Gio.ListStore for tracking Playlist GObjects."""
import sqlite3
from gi.repository import GObject
from gi.repository import Gio
from gi.repository import Gtk
@ -102,6 +103,9 @@ class Table(table.Table):
active_playlist = GObject.Property(type=Playlist)
treemodel = GObject.Property(type=Gtk.TreeListModel)
autodelete = GObject.Property(type=bool, default=False)
system_tracks = GObject.Property(type=bool, default=True)
def __init__(self, sql: GObject.TYPE_PYOBJECT, **kwargs):
"""Initialize a Playlist Table."""
super().__init__(sql=sql, **kwargs)
@ -110,13 +114,56 @@ class Table(table.Table):
autoexpand=False,
create_func=self.__create_tree)
def __do_autodelete(self, plist: Playlist) -> bool:
if plist.n_tracks == 0:
self.delete(plist)
return True
def __autodelete(self, plist: Playlist):
if self.autodelete:
self.queue.push(self.__do_autodelete, plist)
def __create_tree(self, plist: Playlist) -> Gtk.FilterListModel | None:
return plist.children
def do_add_track(self, playlist: Playlist, track: Track) -> bool:
"""Add a Track to the Playlist."""
raise NotImplementedError
def do_get_sort_key(self, playlist: Playlist) -> tuple[str]:
"""Get a sort key for the requested Playlist."""
return format.sort_key(playlist.name)
def do_move_track_down(self, playlist: Playlist, track: Track) -> bool:
"""Move a track down in the sort order."""
raise NotImplementedError
def do_move_track_up(self, playlist: Playlist, track: Track) -> bool:
"""Move a track up in the sort order."""
raise NotImplementedError
def do_remove_track(self, playlist: Playlist, track: Track) -> bool:
"""Remove a Track from the Playlist."""
raise NotImplementedError
def do_sql_select_trackids(self, playlist: Playlist) -> sqlite3.Cursor:
"""Select the trackids that are in this Playlist."""
raise NotImplementedError
def add_system_track(self, playlist: Playlist, track: Track) -> bool:
"""Add a Track to a system Playlist."""
cur = self.sql("""INSERT INTO system_tracks (propertyid, trackid)
VALUES (?, ?)""", playlist.propertyid, track.trackid)
return cur and cur.rowcount == 1
def add_track(self, playlist: Playlist, track: Track) -> bool:
"""Add a Track to a Playlist."""
if track is None or track.get_library().deleting:
return False
if self.system_tracks:
return self.add_system_track(playlist, track)
return self.do_add_track(playlist, track)
def clear(self) -> None:
"""Clear the Table."""
self.active_playlist = None
@ -127,6 +174,7 @@ class Table(table.Table):
res = super().construct(propertyid=propertyid, name=name, **kwargs)
if res.active:
self.sql.set_active_playlist(res)
res.reload_tracks(idle=True)
return res
def delete(self, playlist: Playlist) -> bool:
@ -135,6 +183,50 @@ class Table(table.Table):
self.sql.set_active_playlist(None)
return super().delete(playlist)
def get_sql_system_trackids(self, playlist: Playlist) -> sqlite3.Cursor:
"""Load a System Playlist's Tracks from the database."""
return self.sql("""SELECT trackid FROM system_tracks_view
WHERE propertyid=?""", playlist.propertyid)
def get_trackids(self, playlist: Playlist) -> set[int]:
"""Load a Playlist's Tracks from the database."""
if self.system_tracks:
cur = self.get_sql_system_trackids(playlist)
else:
cur = self.do_sql_select_trackids(playlist)
res = {row["trackid"] for row in cur.fetchall()}
self.__autodelete(playlist)
return res
def move_track_down(self, playlist: Playlist, track: Track) -> bool:
"""Move a track down in the playlist."""
if not playlist.tracks_movable:
return False
return self.do_move_track_down(playlist, track)
def move_track_up(self, playlist: Playlist, track: Track) -> bool:
"""Move a track up in the playlist."""
if not playlist.tracks_movable:
return False
return self.do_move_track_up(playlist, track)
def remove_system_track(self, playlist: Playlist, track: Track) -> bool:
"""Remove a Track from a system Playlist."""
return self.sql("""DELETE FROM system_tracks
WHERE propertyid=? AND trackid=?""",
playlist.propertyid, track.trackid).rowcount == 1
def remove_track(self, playlist: Playlist, track: Track) -> bool:
"""Remove a Track from a Playlist."""
if self.system_tracks:
res = self.remove_system_track(playlist, track)
else:
res = self.do_remove_track(playlist, track)
self.__autodelete(playlist)
return res
def update(self, playlist: Playlist, column: str, newval) -> bool:
"""Update a Playlist in the Database."""
match column:

View File

@ -1,5 +1,6 @@
# Copyright 2022 (c) Anna Schumaker
"""Tests our ListStore and Playlist objects."""
import pathlib
import unittest
import unittest.mock
import emmental.db.playlist
@ -169,6 +170,14 @@ class TestPlaylistTable(tests.util.TestCase):
self.table = tests.util.playlist.MockTable(self.sql)
self.sql("DELETE FROM playlist_properties")
self.library = self.sql.libraries.create(pathlib.Path("/a/b"))
self.album = self.sql.albums.create("Test Album", "Artist", "2023-03")
self.medium = self.sql.media.create(self.album, "", number=1)
self.year = self.sql.years.create(2023)
self.track = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/c.ogg"),
self.medium, self.year)
def test_treemodel(self):
"""Check that the table's treemodel was set up properly."""
self.assertIsInstance(self.table.treemodel, Gtk.TreeListModel)
@ -181,6 +190,25 @@ class TestPlaylistTable(tests.util.TestCase):
root.children = Gtk.FilterListModel()
self.assertEqual(self.table._Table__create_tree(root), root.children)
def test_add_track(self):
"""Test adding a track to a playlist."""
self.assertTrue(self.table.system_tracks)
plist = self.table.create("Test Playlist")
self.library.deleting = True
self.assertFalse(self.table.add_track(plist, None))
self.assertFalse(self.table.add_track(plist, self.track))
self.library.deleting = False
self.assertTrue(self.table.add_track(plist, self.track))
cur = self.sql("SELECT COUNT(*) FROM system_tracks")
self.assertEqual(cur.fetchone()["COUNT(*)"], 1)
self.assertFalse(self.table.add_track(plist, self.track))
self.table.system_tracks = False
with self.assertRaises(NotImplementedError):
self.table.add_track(plist, self.track)
def test_construct(self):
"""Test constructing a new playlist."""
self.assertIsNone(self.table.active_playlist)
@ -197,6 +225,9 @@ class TestPlaylistTable(tests.util.TestCase):
self.assertEqual(self.sql.active_playlist, plist2)
self.assertTrue(plist2.active)
self.assertEqual(self.table.queue[0], (plist1.load_tracks,))
self.assertEqual(self.table.queue[1], (plist2.load_tracks,))
def test_get_sort_key(self):
"""Test getting a sort key for a playlist."""
plist = self.table.create("Playlist 1")
@ -212,8 +243,10 @@ class TestPlaylistTable(tests.util.TestCase):
self.assertEqual(len(self.table), 0)
def test_delete(self):
"""Test deleting the active playlist."""
"""Test deleting the active playlist with system tracks."""
self.table.system_tracks = True
plist = self.table.create("Test Playlist")
plist.add_track(self.track)
self.sql.set_active_playlist(plist)
self.assertTrue(self.table.delete(plist))
@ -221,6 +254,64 @@ class TestPlaylistTable(tests.util.TestCase):
self.assertIsNone(self.sql.active_playlist)
self.assertNotIn(plist, self.table)
cur = self.sql("SELECT COUNT(*) FROM system_tracks")
self.assertEqual(cur.fetchone()["COUNT(*)"], 0)
def test_get_trackids(self):
"""Test getting the set of trackids for this playlist."""
self.assertTrue(self.table.system_tracks)
self.table._Table__autodelete = unittest.mock.Mock()
plist = self.table.create("Test Playlist")
self.assertSetEqual(self.table.get_trackids(plist), set())
self.table._Table__autodelete.assert_called_with(plist)
plist.add_track(self.track)
self.assertSetEqual(self.table.get_trackids(plist),
{self.track.trackid})
self.library.deleting = True
self.assertSetEqual(self.table.get_trackids(plist), set())
self.table.system_tracks = False
with self.assertRaises(NotImplementedError):
self.table.get_trackids(plist)
def test_move_track_down(self):
"""Test moving tracks down in the sort order."""
plist = self.table.create("Test Playlist")
self.assertFalse(self.table.move_track_down(plist, self.track))
plist.tracks_movable = True
with self.assertRaises(NotImplementedError):
self.table.move_track_down(plist, self.track)
def test_move_track_up(self):
"""Test moving tracks up in the sort order."""
plist = self.table.create("Test Playlist")
self.assertFalse(self.table.move_track_up(plist, self.track))
plist.tracks_movable = True
with self.assertRaises(NotImplementedError):
self.table.move_track_up(plist, self.track)
def test_remove_track(self):
"""Test adding a track to a playlist."""
self.assertTrue(self.table.system_tracks)
self.table._Table__autodelete = unittest.mock.Mock()
plist = self.table.create("Test Playlist")
plist.add_track(self.track)
self.assertTrue(self.table.remove_track(plist, self.track))
cur = self.sql("SELECT COUNT(*) FROM system_tracks")
self.assertEqual(cur.fetchone()["COUNT(*)"], 0)
self.assertFalse(self.table.remove_track(plist, self.track))
self.table._Table__autodelete.assert_called_with(plist)
self.table.system_tracks = False
with self.assertRaises(NotImplementedError):
self.table.remove_track(plist, self.track)
def test_update(self):
"""Test updating playlist properties."""
plist1 = self.table.create("Test Playlist 1")
@ -237,3 +328,26 @@ class TestPlaylistTable(tests.util.TestCase):
row = self.sql("SELECT active FROM playlist_properties WHERE rowid=?",
plist1.propertyid).fetchone()
self.assertEqual(row["active"], False)
def test_autodelete(self):
"""Test automatically deleting playlists."""
plist = self.table.create("Test Playlist")
self.table.queue.cancel()
self.assertFalse(self.table.autodelete)
self.table._Table__autodelete(plist)
self.assertFalse(self.table.queue.running)
self.assertIn(plist, self.table)
self.table.autodelete = True
self.table._Table__autodelete(plist)
self.assertTupleEqual(self.table.queue[0],
(self.table._Table__do_autodelete, plist))
plist.n_tracks = 1
self.assertTrue(self.table._Table__do_autodelete(plist))
self.assertIn(plist, self.table)
plist.n_tracks = 0
self.assertTrue(self.table._Table__do_autodelete(plist))
self.assertNotIn(plist, self.table)