From dc8ccff311204111d632cc6e2150e3900d533ad0 Mon Sep 17 00:00:00 2001 From: Anna Schumaker Date: Fri, 23 Sep 2022 09:16:54 -0400 Subject: [PATCH] db: Add current track accounting Tracks now have start(), stop(), and restart() functions that can be used by the application to update the laststarted, lastplayed, playcount, active, and restarted properties. The track Table implements their half of these functions in addition to a mark_path_active() function so opening Emmental with a filepath can update the current track before the database is loaded. The Table also adjusts the necessary system playlists when tracks are marked as played. Finally, the Table now has have-current-track and current-track properties that can be wired up to the Now Playing card. Signed-off-by: Anna Schumaker --- emmental/db/tracks.py | 79 ++++++++++++++++++- tests/db/test_tracks.py | 164 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 240 insertions(+), 3 deletions(-) diff --git a/emmental/db/tracks.py b/emmental/db/tracks.py index 1dc4a5e..60f6cbd 100644 --- a/emmental/db/tracks.py +++ b/emmental/db/tracks.py @@ -1,5 +1,6 @@ # Copyright 2022 (c) Anna Schumaker. """A custom Gio.ListModel for working with tracks.""" +import datetime import pathlib import sqlite3 from gi.repository import GObject @@ -55,6 +56,18 @@ class Track(table.Row): """Get the Year associated with this Track.""" return self.table.sql.years.rows.get(self.year) + def restart(self) -> None: + """Mark that a previously started track has been started again.""" + self.table.restart_track(self) + + def start(self) -> None: + """Mark that this track has started playback.""" + self.table.start_track(self) + + def stop(self, play_time: float) -> None: + """Mark that this track has stopped playback.""" + self.table.stop_track(self, play_time / self.length > PLAYED_THRESHOLD) + def update_properties(self, **kwargs) -> None: """Update one or more of this Track's properties.""" for (property, newval) in kwargs.items(): @@ -80,14 +93,27 @@ class Filter(table.Filter): class Table(table.Table): """A ListStore tailored for storing Track objects.""" + have_current_track = GObject.Property(type=bool, default=False) + current_track = GObject.Property(type=Track) + def __init__(self, sql: GObject.TYPE_PYOBJECT): """Initialize a Track Table.""" super().__init__(sql, filter=Filter()) self.set_model(None) + self.connect("notify::current-track", self.__notify_current_track) + + def __notify_current_track(self, table: table.Table, param) -> None: + if self.current_track is not None: + self.have_current_track = True + self.sql.playlists.previous.add_track(self.current_track) + else: + self.have_current_track = False def do_construct(self, **kwargs) -> Track: """Construct a new Track instance.""" - return Track(**kwargs) + if (track := Track(**kwargs)).active: + self.current_track = track + return track def do_sql_delete(self, track: Track) -> sqlite3.Cursor: """Delete a Track.""" @@ -152,3 +178,54 @@ class Table(table.Table): cur = self.sql(f"""SELECT trackid FROM track_info_view ORDER BY {ordering}""") return {row["trackid"]: i for (i, row) in enumerate(cur.fetchall())} + + def mark_path_active(self, path: pathlib.Path) -> None: + """Mark a specific track as active in the database..""" + if self.sql("UPDATE tracks SET active=TRUE WHERE path=?", + path).rowcount == 0: + self.sql("UPDATE tracks SET active=FALSE WHERE active=TRUE") + + def restart_track(self, track: Track) -> None: + """Mark that a Track has been restarted.""" + track.active = True + track.restarted = datetime.datetime.utcnow() + self.current_track = track + + def start_track(self, track: Track) -> None: + """Mark that a Track has been started.""" + self.sql.playlists.previous.remove_track(track) + + cur = self.sql("""UPDATE tracks SET active=TRUE, laststarted=? + WHERE trackid=? RETURNING laststarted""", + datetime.datetime.utcnow(), track.trackid) + track.active = True + track.laststarted = cur.fetchone()["laststarted"] + self.current_track = track + + def stop_track(self, track: Track, played: bool) -> None: + """Mark that a Track has been stopped.""" + args = [("active=?", False)] + + if played: + if track.restarted is not None: + track.laststarted = track.restarted + args.append(("laststarted=?", track.restarted)) + args.append(("lastplayed=?", track.laststarted)) + args.append(("playcount=?", track.playcount + 1)) + + (fields, vals) = tuple(zip(*args)) + update = ", ".join(fields) + row = self.sql(f"""UPDATE tracks SET {update} WHERE trackid=? + RETURNING lastplayed, playcount""", + *vals, track.trackid).fetchone() + + track.active = False + track.playcount = row["playcount"] + track.lastplayed = row["lastplayed"] + track.restarted = None + self.current_track = None + + if played: + self.sql.playlists.most_played.reload_tracks(idle=True) + self.sql.playlists.queued.remove_track(track) + self.sql.playlists.unplayed.remove_track(track) diff --git a/tests/db/test_tracks.py b/tests/db/test_tracks.py index 0243dad..9f7572e 100644 --- a/tests/db/test_tracks.py +++ b/tests/db/test_tracks.py @@ -74,6 +74,28 @@ class TestTrackObject(tests.util.TestCase): """Test getting a Year playlist.""" self.assertEqual(self.track.get_year(), self.year) + def test_restart(self): + """Test the Track.restart() function.""" + self.table.restart_track = unittest.mock.Mock() + self.track.restart() + self.table.restart_track.assert_called_with(self.track) + + def test_start(self): + """Test the Track.start() function.""" + self.table.start_track = unittest.mock.Mock() + self.track.start() + self.table.start_track.assert_called_with(self.track) + + def test_stop(self): + """Test the Track.stop() function.""" + self.table.stop_track = unittest.mock.Mock() + self.track.length = 3 + + self.track.stop(0) + self.table.stop_track.assert_called_with(self.track, False) + self.track.stop(2.5) + self.table.stop_track.assert_called_with(self.track, True) + def test_update_properties(self): """Test updating track properties.""" now = datetime.datetime.now() @@ -115,7 +137,12 @@ class TestTrackTable(tests.util.TestCase): self.sql.playlists.load(now=True) self.sql.playlists.favorites.add_track = unittest.mock.Mock() self.sql.playlists.favorites.remove_track = unittest.mock.Mock() + self.sql.playlists.most_played.reload_tracks = unittest.mock.Mock() self.sql.playlists.previous.add_track = unittest.mock.Mock() + self.sql.playlists.previous.remove_track = unittest.mock.Mock() + self.sql.playlists.queued.remove_track = unittest.mock.Mock() + self.sql.playlists.unplayed.remove_track = unittest.mock.Mock() + self.playlists = self.sql.playlists self.library = self.sql.libraries.create(pathlib.Path("/a/b/")) self.album = self.sql.albums.create("Test Album", "Album Artist", @@ -169,6 +196,7 @@ class TestTrackTable(tests.util.TestCase): self.assertEqual(track.lastplayed, now) self.assertFalse(track.active) self.assertFalse(track.favorite) + self.assertIsNone(self.tracks.current_track) def test_create(self): """Test creating a new Track.""" @@ -256,6 +284,7 @@ class TestTrackTable(tests.util.TestCase): table2.load(now=True) self.assertEqual(len(table2.store), 2) + self.assertEqual(table2.current_track, table2.store[1]) for i in [0, 1]: with self.subTest(i=i): @@ -335,7 +364,7 @@ class TestTrackTable(tests.util.TestCase): favorite=True, mbid="ab-cd-ef", title="New Title", artist="New Artist", number=1, length=42, mtime=123.45) - self.sql.playlists.favorites.add_track.assert_called_with(track) + self.playlists.favorites.add_track.assert_called_with(track) cur = self.sql("""SELECT mediumid, year, favorite, mbid, title, artist, number, length, mtime @@ -352,7 +381,7 @@ class TestTrackTable(tests.util.TestCase): self.assertEqual(row["mtime"], 123.45) track.update_properties(favorite=False) - self.sql.playlists.favorites.remove_track.assert_called_with(track) + self.playlists.favorites.remove_track.assert_called_with(track) track2 = self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"), self.medium, self.year, length=10) @@ -360,3 +389,134 @@ class TestTrackTable(tests.util.TestCase): row = self.sql("SELECT active FROM tracks WHERE trackid=?", track.trackid).fetchone() self.assertFalse(row["active"]) + + def test_mark_path_active(self): + """Test marking a path as active.""" + self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), + self.medium, self.year) + + self.tracks.mark_path_active(pathlib.Path("/a/b/1.ogg")) + cur = self.sql("SELECT trackid FROM tracks WHERE active=TRUE") + self.assertEqual(cur.fetchone()["trackid"], 1) + + self.tracks.mark_path_active(pathlib.Path("/a/b/4.ogg")) + cur = self.sql("SELECT trackid FROM tracks WHERE active=TRUE") + self.assertIsNone(cur.fetchone()) + + def test_restart_track(self): + """Test marking that a Track has restarted.""" + track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), + self.medium, self.year) + + track.restart() + self.assertTrue(track.active) + self.assertGreater(datetime.datetime.utcnow(), track.restarted) + self.assertEqual(self.tracks.current_track, track) + + self.playlists.previous.remove_track.assert_not_called() + + def test_start_track(self): + """Test marking that a Track has started playback.""" + track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), + self.medium, self.year) + + track.start() + row = self.sql("SELECT laststarted FROM tracks WHERE trackid=?", + track.trackid).fetchone() + self.assertTrue(track.active) + self.assertIsNotNone(track.laststarted) + self.assertEqual(track.laststarted, row["laststarted"]) + self.assertEqual(self.tracks.current_track, track) + + self.playlists.previous.remove_track.assert_called_with(track) + self.playlists.previous.add_track.assert_called_with(track) + + def test_stop_started_track(self): + """Test marking that a Track has stopped playback.""" + track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), + self.medium, self.year, length=10) + + track.start() + track.stop(3) + row = self.sql("SELECT lastplayed FROM tracks WHERE trackid=?", + track.trackid).fetchone() + self.assertFalse(track.active) + self.assertEqual(track.playcount, 0) + self.assertIsNone(row["lastplayed"]) + self.assertIsNone(track.lastplayed) + self.assertIsNone(self.tracks.current_track) + + self.playlists.most_played.reload_tracks.assert_not_called() + self.playlists.queued.remove_track.assert_not_called() + self.playlists.unplayed.remove_track.assert_not_called() + + track.start() + track.stop(8) + row = self.sql("""SELECT lastplayed, playcount FROM tracks + WHERE trackid=?""", track.trackid).fetchone() + self.assertEqual(row["playcount"], 1) + self.assertEqual(track.playcount, 1) + self.assertEqual(row["lastplayed"], track.laststarted) + self.assertEqual(track.lastplayed, track.laststarted) + + self.playlists.most_played.reload_tracks.assert_called() + self.playlists.queued.remove_track.assert_called_with(track) + self.playlists.unplayed.remove_track.assert_called_with(track) + + def test_stop_restarted_track(self): + """Test marking that a restarted Track has stopped playback.""" + track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), + self.medium, self.year, length=10) + + track.restart() + track.stop(3) + row = self.sql("""SELECT lastplayed, laststarted FROM tracks + WHERE trackid=?""", track.trackid).fetchone() + self.assertFalse(track.active) + self.assertEqual(track.playcount, 0) + self.assertIsNone(row["lastplayed"]) + self.assertIsNone(track.lastplayed) + self.assertIsNone(row["laststarted"]) + self.assertIsNone(track.laststarted) + self.assertIsNone(track.restarted) + self.assertIsNone(self.tracks.current_track) + + self.playlists.most_played.reload_tracks.assert_not_called() + self.playlists.queued.remove_track.assert_not_called() + self.playlists.unplayed.remove_track.assert_not_called() + + track.restart() + restarted = track.restarted + track.stop(8) + row = self.sql("""SELECT lastplayed, laststarted, playcount FROM tracks + WHERE trackid=?""", track.trackid).fetchone() + self.assertEqual(row["playcount"], 1) + self.assertEqual(track.playcount, 1) + self.assertEqual(row["lastplayed"], restarted) + self.assertEqual(track.lastplayed, restarted) + self.assertEqual(row["laststarted"], restarted) + self.assertEqual(track.laststarted, restarted) + + self.playlists.most_played.reload_tracks.assert_called_with(idle=True) + self.playlists.queued.remove_track.assert_called_with(track) + self.playlists.unplayed.remove_track.assert_called_with(track) + + def test_current_track(self): + """Test the current-track and have-current-track properties.""" + self.assertIsNone(self.tracks.current_track) + self.assertFalse(self.tracks.have_current_track) + + track = self.tracks.construct(trackid=2, active=True) + self.assertEqual(self.tracks.current_track, track) + self.assertTrue(self.tracks.have_current_track) + self.playlists.previous.add_track.assert_called_with(track) + + track2 = self.tracks.construct(trackid=2, active=True, favorite=True) + self.assertEqual(self.tracks.current_track, track2) + self.assertTrue(self.tracks.have_current_track) + self.playlists.previous.add_track.assert_called_with(track2) + + self.playlists.previous.add_track.reset_mock() + self.tracks.current_track = None + self.assertFalse(self.tracks.have_current_track) + self.playlists.previous.add_track.assert_not_called()