diff --git a/emmental/db/tracks.py b/emmental/db/tracks.py index 1dc4a5e..60f6cbd 100644 --- a/emmental/db/tracks.py +++ b/emmental/db/tracks.py @@ -1,5 +1,6 @@ # Copyright 2022 (c) Anna Schumaker. """A custom Gio.ListModel for working with tracks.""" +import datetime import pathlib import sqlite3 from gi.repository import GObject @@ -55,6 +56,18 @@ class Track(table.Row): """Get the Year associated with this Track.""" return self.table.sql.years.rows.get(self.year) + def restart(self) -> None: + """Mark that a previously started track has been started again.""" + self.table.restart_track(self) + + def start(self) -> None: + """Mark that this track has started playback.""" + self.table.start_track(self) + + def stop(self, play_time: float) -> None: + """Mark that this track has stopped playback.""" + self.table.stop_track(self, play_time / self.length > PLAYED_THRESHOLD) + def update_properties(self, **kwargs) -> None: """Update one or more of this Track's properties.""" for (property, newval) in kwargs.items(): @@ -80,14 +93,27 @@ class Filter(table.Filter): class Table(table.Table): """A ListStore tailored for storing Track objects.""" + have_current_track = GObject.Property(type=bool, default=False) + current_track = GObject.Property(type=Track) + def __init__(self, sql: GObject.TYPE_PYOBJECT): """Initialize a Track Table.""" super().__init__(sql, filter=Filter()) self.set_model(None) + self.connect("notify::current-track", self.__notify_current_track) + + def __notify_current_track(self, table: table.Table, param) -> None: + if self.current_track is not None: + self.have_current_track = True + self.sql.playlists.previous.add_track(self.current_track) + else: + self.have_current_track = False def do_construct(self, **kwargs) -> Track: """Construct a new Track instance.""" - return Track(**kwargs) + if (track := Track(**kwargs)).active: + self.current_track = track + return track def do_sql_delete(self, track: Track) -> sqlite3.Cursor: """Delete a Track.""" @@ -152,3 +178,54 @@ class Table(table.Table): cur = self.sql(f"""SELECT trackid FROM track_info_view ORDER BY {ordering}""") return {row["trackid"]: i for (i, row) in enumerate(cur.fetchall())} + + def mark_path_active(self, path: pathlib.Path) -> None: + """Mark a specific track as active in the database..""" + if self.sql("UPDATE tracks SET active=TRUE WHERE path=?", + path).rowcount == 0: + self.sql("UPDATE tracks SET active=FALSE WHERE active=TRUE") + + def restart_track(self, track: Track) -> None: + """Mark that a Track has been restarted.""" + track.active = True + track.restarted = datetime.datetime.utcnow() + self.current_track = track + + def start_track(self, track: Track) -> None: + """Mark that a Track has been started.""" + self.sql.playlists.previous.remove_track(track) + + cur = self.sql("""UPDATE tracks SET active=TRUE, laststarted=? + WHERE trackid=? RETURNING laststarted""", + datetime.datetime.utcnow(), track.trackid) + track.active = True + track.laststarted = cur.fetchone()["laststarted"] + self.current_track = track + + def stop_track(self, track: Track, played: bool) -> None: + """Mark that a Track has been stopped.""" + args = [("active=?", False)] + + if played: + if track.restarted is not None: + track.laststarted = track.restarted + args.append(("laststarted=?", track.restarted)) + args.append(("lastplayed=?", track.laststarted)) + args.append(("playcount=?", track.playcount + 1)) + + (fields, vals) = tuple(zip(*args)) + update = ", ".join(fields) + row = self.sql(f"""UPDATE tracks SET {update} WHERE trackid=? + RETURNING lastplayed, playcount""", + *vals, track.trackid).fetchone() + + track.active = False + track.playcount = row["playcount"] + track.lastplayed = row["lastplayed"] + track.restarted = None + self.current_track = None + + if played: + self.sql.playlists.most_played.reload_tracks(idle=True) + self.sql.playlists.queued.remove_track(track) + self.sql.playlists.unplayed.remove_track(track) diff --git a/tests/db/test_tracks.py b/tests/db/test_tracks.py index 0243dad..9f7572e 100644 --- a/tests/db/test_tracks.py +++ b/tests/db/test_tracks.py @@ -74,6 +74,28 @@ class TestTrackObject(tests.util.TestCase): """Test getting a Year playlist.""" self.assertEqual(self.track.get_year(), self.year) + def test_restart(self): + """Test the Track.restart() function.""" + self.table.restart_track = unittest.mock.Mock() + self.track.restart() + self.table.restart_track.assert_called_with(self.track) + + def test_start(self): + """Test the Track.start() function.""" + self.table.start_track = unittest.mock.Mock() + self.track.start() + self.table.start_track.assert_called_with(self.track) + + def test_stop(self): + """Test the Track.stop() function.""" + self.table.stop_track = unittest.mock.Mock() + self.track.length = 3 + + self.track.stop(0) + self.table.stop_track.assert_called_with(self.track, False) + self.track.stop(2.5) + self.table.stop_track.assert_called_with(self.track, True) + def test_update_properties(self): """Test updating track properties.""" now = datetime.datetime.now() @@ -115,7 +137,12 @@ class TestTrackTable(tests.util.TestCase): self.sql.playlists.load(now=True) self.sql.playlists.favorites.add_track = unittest.mock.Mock() self.sql.playlists.favorites.remove_track = unittest.mock.Mock() + self.sql.playlists.most_played.reload_tracks = unittest.mock.Mock() self.sql.playlists.previous.add_track = unittest.mock.Mock() + self.sql.playlists.previous.remove_track = unittest.mock.Mock() + self.sql.playlists.queued.remove_track = unittest.mock.Mock() + self.sql.playlists.unplayed.remove_track = unittest.mock.Mock() + self.playlists = self.sql.playlists self.library = self.sql.libraries.create(pathlib.Path("/a/b/")) self.album = self.sql.albums.create("Test Album", "Album Artist", @@ -169,6 +196,7 @@ class TestTrackTable(tests.util.TestCase): self.assertEqual(track.lastplayed, now) self.assertFalse(track.active) self.assertFalse(track.favorite) + self.assertIsNone(self.tracks.current_track) def test_create(self): """Test creating a new Track.""" @@ -256,6 +284,7 @@ class TestTrackTable(tests.util.TestCase): table2.load(now=True) self.assertEqual(len(table2.store), 2) + self.assertEqual(table2.current_track, table2.store[1]) for i in [0, 1]: with self.subTest(i=i): @@ -335,7 +364,7 @@ class TestTrackTable(tests.util.TestCase): favorite=True, mbid="ab-cd-ef", title="New Title", artist="New Artist", number=1, length=42, mtime=123.45) - self.sql.playlists.favorites.add_track.assert_called_with(track) + self.playlists.favorites.add_track.assert_called_with(track) cur = self.sql("""SELECT mediumid, year, favorite, mbid, title, artist, number, length, mtime @@ -352,7 +381,7 @@ class TestTrackTable(tests.util.TestCase): self.assertEqual(row["mtime"], 123.45) track.update_properties(favorite=False) - self.sql.playlists.favorites.remove_track.assert_called_with(track) + self.playlists.favorites.remove_track.assert_called_with(track) track2 = self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"), self.medium, self.year, length=10) @@ -360,3 +389,134 @@ class TestTrackTable(tests.util.TestCase): row = self.sql("SELECT active FROM tracks WHERE trackid=?", track.trackid).fetchone() self.assertFalse(row["active"]) + + def test_mark_path_active(self): + """Test marking a path as active.""" + self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), + self.medium, self.year) + + self.tracks.mark_path_active(pathlib.Path("/a/b/1.ogg")) + cur = self.sql("SELECT trackid FROM tracks WHERE active=TRUE") + self.assertEqual(cur.fetchone()["trackid"], 1) + + self.tracks.mark_path_active(pathlib.Path("/a/b/4.ogg")) + cur = self.sql("SELECT trackid FROM tracks WHERE active=TRUE") + self.assertIsNone(cur.fetchone()) + + def test_restart_track(self): + """Test marking that a Track has restarted.""" + track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), + self.medium, self.year) + + track.restart() + self.assertTrue(track.active) + self.assertGreater(datetime.datetime.utcnow(), track.restarted) + self.assertEqual(self.tracks.current_track, track) + + self.playlists.previous.remove_track.assert_not_called() + + def test_start_track(self): + """Test marking that a Track has started playback.""" + track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), + self.medium, self.year) + + track.start() + row = self.sql("SELECT laststarted FROM tracks WHERE trackid=?", + track.trackid).fetchone() + self.assertTrue(track.active) + self.assertIsNotNone(track.laststarted) + self.assertEqual(track.laststarted, row["laststarted"]) + self.assertEqual(self.tracks.current_track, track) + + self.playlists.previous.remove_track.assert_called_with(track) + self.playlists.previous.add_track.assert_called_with(track) + + def test_stop_started_track(self): + """Test marking that a Track has stopped playback.""" + track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), + self.medium, self.year, length=10) + + track.start() + track.stop(3) + row = self.sql("SELECT lastplayed FROM tracks WHERE trackid=?", + track.trackid).fetchone() + self.assertFalse(track.active) + self.assertEqual(track.playcount, 0) + self.assertIsNone(row["lastplayed"]) + self.assertIsNone(track.lastplayed) + self.assertIsNone(self.tracks.current_track) + + self.playlists.most_played.reload_tracks.assert_not_called() + self.playlists.queued.remove_track.assert_not_called() + self.playlists.unplayed.remove_track.assert_not_called() + + track.start() + track.stop(8) + row = self.sql("""SELECT lastplayed, playcount FROM tracks + WHERE trackid=?""", track.trackid).fetchone() + self.assertEqual(row["playcount"], 1) + self.assertEqual(track.playcount, 1) + self.assertEqual(row["lastplayed"], track.laststarted) + self.assertEqual(track.lastplayed, track.laststarted) + + self.playlists.most_played.reload_tracks.assert_called() + self.playlists.queued.remove_track.assert_called_with(track) + self.playlists.unplayed.remove_track.assert_called_with(track) + + def test_stop_restarted_track(self): + """Test marking that a restarted Track has stopped playback.""" + track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), + self.medium, self.year, length=10) + + track.restart() + track.stop(3) + row = self.sql("""SELECT lastplayed, laststarted FROM tracks + WHERE trackid=?""", track.trackid).fetchone() + self.assertFalse(track.active) + self.assertEqual(track.playcount, 0) + self.assertIsNone(row["lastplayed"]) + self.assertIsNone(track.lastplayed) + self.assertIsNone(row["laststarted"]) + self.assertIsNone(track.laststarted) + self.assertIsNone(track.restarted) + self.assertIsNone(self.tracks.current_track) + + self.playlists.most_played.reload_tracks.assert_not_called() + self.playlists.queued.remove_track.assert_not_called() + self.playlists.unplayed.remove_track.assert_not_called() + + track.restart() + restarted = track.restarted + track.stop(8) + row = self.sql("""SELECT lastplayed, laststarted, playcount FROM tracks + WHERE trackid=?""", track.trackid).fetchone() + self.assertEqual(row["playcount"], 1) + self.assertEqual(track.playcount, 1) + self.assertEqual(row["lastplayed"], restarted) + self.assertEqual(track.lastplayed, restarted) + self.assertEqual(row["laststarted"], restarted) + self.assertEqual(track.laststarted, restarted) + + self.playlists.most_played.reload_tracks.assert_called_with(idle=True) + self.playlists.queued.remove_track.assert_called_with(track) + self.playlists.unplayed.remove_track.assert_called_with(track) + + def test_current_track(self): + """Test the current-track and have-current-track properties.""" + self.assertIsNone(self.tracks.current_track) + self.assertFalse(self.tracks.have_current_track) + + track = self.tracks.construct(trackid=2, active=True) + self.assertEqual(self.tracks.current_track, track) + self.assertTrue(self.tracks.have_current_track) + self.playlists.previous.add_track.assert_called_with(track) + + track2 = self.tracks.construct(trackid=2, active=True, favorite=True) + self.assertEqual(self.tracks.current_track, track2) + self.assertTrue(self.tracks.have_current_track) + self.playlists.previous.add_track.assert_called_with(track2) + + self.playlists.previous.add_track.reset_mock() + self.tracks.current_track = None + self.assertFalse(self.tracks.have_current_track) + self.playlists.previous.add_track.assert_not_called()