db: Give playlists a get_track() function

For finding a single track at a given index into the select query
results, ordered by the configured sort order.

Implements: Issue #3 (Sort playlists through SQLite)
Implements: Issue #15 (Convert Playlists into Gio.ListModels)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2021-10-17 16:43:09 -04:00
parent 8917600970
commit 1d5f88f080
10 changed files with 66 additions and 21 deletions

View File

@ -21,6 +21,18 @@ class Playlist(GObject.GObject):
f"WHERE {self._rowkey}=?", [ self._rowid ])
return cur.fetchone()[0]
def get_track(self, n):
order = ', '.join(self.plist_state.sort)
row = sql.execute(f"SELECT * FROM tracks "
f"INNER JOIN artists USING(artistid) "
f"INNER JOIN albums USING(albumid) "
f"INNER JOIN discs USING(discid) "
f"INNER JOIN years USING(yearid) "
f"WHERE tracks.{self._rowkey}=? "
f"ORDER BY {order} LIMIT 1 OFFSET ?",
[ self.rowid, n ]).fetchone()
return track.Table.factory(row)
@GObject.Property
def name(self): raise NotImplementedError
@ -59,6 +71,19 @@ class MappedPlaylist(Playlist):
f"WHERE {self._rowkey}=?", [ self._rowid ])
return cur.fetchone()[0]
def get_track(self, n):
order = ', '.join(self.plist_state.sort)
row = sql.execute(f"SELECT * FROM tracks "
f"INNER JOIN {self.map_table} USING(trackid) "
f"INNER JOIN artists USING(artistid) "
f"INNER JOIN albums USING(albumid) "
f"INNER JOIN discs USING(discid) "
f"INNER JOIN years USING(yearid) "
f"WHERE {self._rowkey}=? "
f"ORDER BY {order} LIMIT 1 OFFSET ?",
[ self._rowid, n ]).fetchone()
return track.Table.factory(row)
def remove_track(self, track):
return sql.execute(f"DELETE FROM {self.map_table} "
f"WHERE {self.rowkey}=? AND trackid=?",
@ -109,3 +134,6 @@ class ChildModel(table.Child):
def delete(self, plist):
state.Table.delete(plist.plist_state)
return super().delete(plist)
from . import track

View File

@ -34,6 +34,7 @@ class TestAlbum(unittest.TestCase):
self.assertEqual(album.get_n_tracks(), 0)
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
self.assertEqual(album.get_n_tracks(), 1)
self.assertEqual(album.get_track(0), track)
class TestAlbumTable(unittest.TestCase):

View File

@ -30,6 +30,7 @@ class TestArtist(unittest.TestCase):
self.assertEqual(artist.get_n_tracks(), 0)
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
self.assertEqual(artist.get_n_tracks(), 1)
self.assertEqual(artist.get_track(0), track)
class TestArtistTable(unittest.TestCase):

View File

@ -41,6 +41,7 @@ class TestDecade(unittest.TestCase):
self.assertEqual(decade.get_n_tracks(), 0)
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
self.assertEqual(decade.get_n_tracks(), 1)
self.assertEqual(decade.get_track(0), track)
class TestDecadeTable(unittest.TestCase):

View File

@ -48,6 +48,7 @@ class TestDisc(unittest.TestCase):
self.assertEqual(disc.get_n_tracks(), 0)
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
self.assertEqual(disc.get_n_tracks(), 1)
self.assertEqual(disc.get_track(0), track)
class TestDiscTable(unittest.TestCase):

View File

@ -29,15 +29,11 @@ class TestGenre(unittest.TestCase):
def test_add_remove_track(self):
genre = db.genre.Table.find("Test Genre")
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
self.assertEqual(genre.get_n_tracks(), 0)
self.assertTrue(genre.add_track(track))
self.assertEqual(genre.get_n_tracks(), 1)
row = sql.execute("SELECT * FROM genre_map WHERE genreid=?",
[ genre.rowid ]).fetchone()
self.assertEqual(row["trackid"], track.rowid)
self.assertEqual(row["genreid"], genre.rowid)
self.assertEqual(genre.get_track(0), track)
self.assertTrue(genre.remove_track(track))
self.assertFalse(genre.remove_track(track))

View File

@ -39,6 +39,7 @@ class TestLibrary(unittest.TestCase):
self.assertEqual(library.get_n_tracks(), 0)
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
self.assertEqual(library.get_n_tracks(), 1)
self.assertEqual(library.get_track(0), track)
class TestLibraryTable(unittest.TestCase):

View File

@ -21,6 +21,7 @@ class TestCollection(unittest.TestCase):
self.assertEqual(collection.get_n_tracks(), 0)
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
self.assertEqual(collection.get_n_tracks(), 1)
self.assertEqual(collection.get_track(0), track)
track.library.enabled = False
self.assertEqual(collection.get_n_tracks(), 0)
@ -45,6 +46,7 @@ class TestFavorites(unittest.TestCase):
self.assertTrue(favorites.add_track(track))
self.assertFalse(favorites.add_track(track))
self.assertEqual(favorites.get_n_tracks(), 1)
self.assertEqual(favorites.get_track(0), track)
self.assertTrue(favorites.remove_track(track))
self.assertFalse(favorites.remove_track(track))
@ -68,11 +70,8 @@ class TestNewTracks(unittest.TestCase):
self.assertEqual(new.get_n_tracks(), 0)
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
cur = sql.execute("SELECT * FROM temp_playlist_map "
"WHERE trackid=? AND playlistid=?",
[ track.rowid, new.rowid ])
self.assertEqual(cur.fetchone()["trackid"], track.rowid)
self.assertEqual(new.get_n_tracks(), 1)
self.assertEqual(new.get_track(0), track)
self.assertTrue(new.remove_track(track))
self.assertFalse(new.remove_track(track))
@ -95,26 +94,26 @@ class TestPrevious(unittest.TestCase):
def test_add_remove_track(self):
previous = db.user.Table.find("Previous")
track1 = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
track2 = db.make_fake_track(2, 2, "Test Track 2", "/a/b/c/2.ogg")
self.assertEqual(previous.get_n_tracks(), 0)
self.assertTrue(previous.add_track(track1))
cur = sql.execute("SELECT *,rowid FROM temp_playlist_map "
"WHERE trackid=? AND playlistid=?",
[ track1.rowid, previous.rowid ])
self.assertEqual(cur.fetchone()["rowid"], 2)
self.assertEqual(previous.get_n_tracks(), 1)
self.assertEqual(previous.get_track(0), track1)
self.assertTrue(previous.add_track(track2))
self.assertEqual(previous.get_n_tracks(), 2)
self.assertEqual(previous.get_track(0), track2)
self.assertEqual(previous.get_track(1), track1)
db.make_fake_track(2, 2, "Test Track 2", "/a/b/c/2.ogg")
self.assertTrue(previous.add_track(track1))
cur = sql.execute("SELECT *,rowid FROM temp_playlist_map "
"WHERE trackid=? AND playlistid=?",
[ track1.rowid, previous.rowid ])
self.assertEqual(cur.fetchone()["rowid"], 4)
self.assertEqual(previous.get_n_tracks(), 1)
self.assertEqual(previous.get_n_tracks(), 2)
self.assertEqual(previous.get_track(0), track1)
self.assertEqual(previous.get_track(1), track2)
self.assertTrue(previous.remove_track(track1))
self.assertFalse(previous.remove_track(track1))
self.assertEqual(previous.get_n_tracks(), 0)
self.assertEqual(previous.get_n_tracks(), 1)
class TestQueuedTracks(unittest.TestCase):
@ -138,6 +137,7 @@ class TestQueuedTracks(unittest.TestCase):
self.assertTrue(queued.add_track(track))
self.assertFalse(queued.add_track(track))
self.assertEqual(queued.get_n_tracks(), 1)
self.assertEqual(queued.get_track(0), track)
self.assertTrue(queued.remove_track(track))
self.assertFalse(queued.remove_track(track))
@ -172,6 +172,7 @@ class TestUserPlaylist(unittest.TestCase):
self.assertTrue(plist.add_track(track))
self.assertFalse(plist.add_track(track))
self.assertEqual(plist.get_n_tracks(), 1)
self.assertEqual(plist.get_track(0), track)
self.assertTrue(plist.remove_track(track))
self.assertFalse(plist.remove_track(track))

View File

@ -28,6 +28,7 @@ class TestYear(unittest.TestCase):
self.assertEqual(year.get_n_tracks(), 0)
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
self.assertEqual(year.get_n_tracks(), 1)
self.assertEqual(year.get_track(0), track)
class TestYearTable(unittest.TestCase):

View File

@ -14,6 +14,7 @@ from . import objects
from . import playlist
from . import sql
from . import state
from . import track
class Collection(playlist.Playlist):
@ -26,6 +27,19 @@ class Collection(playlist.Playlist):
"JOIN libraries USING(libraryid) WHERE enabled=1")
return cur.fetchone()[0]
def get_track(self, n):
order = ', '.join(self.plist_state.sort)
row = sql.execute(f"SELECT * FROM tracks "
f"INNER JOIN artists USING(artistid) "
f"INNER JOIN albums USING(albumid) "
f"INNER JOIN discs USING(discid) "
f"INNER JOIN years USING(yearid) "
f"INNER JOIN libraries USING(libraryid) "
f"WHERE libraries.enabled=1 "
f"ORDER BY {order} LIMIT 1 OFFSET ?",
[ n ]).fetchone()
return track.Table.factory(row)
@GObject.Property
def name(self): return self._name