emmental/tests/db/test_tracks.py
Anna Schumaker dc8ccff311 db: Add current track accounting
Tracks now have start(), stop(), and restart() functions that can be
used by the application to update the laststarted, lastplayed,
playcount, active, and restarted properties.

The track Table implements their half of these functions in addition to
a mark_path_active() function so opening Emmental with a filepath can
update the current track before the database is loaded. The Table also
adjusts the necessary system playlists when tracks are marked as played.

Finally, the Table now has have-current-track and current-track
properties that can be wired up to the Now Playing card.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2023-05-10 14:42:58 -04:00

523 lines
24 KiB
Python

# Copyright 2022 (c) Anna Schumaker.
"""Tests our track Gio.ListModel."""
import datetime
import pathlib
import emmental.db.tracks
import tests.util
import unittest.mock
from gi.repository import Gio
from gi.repository import Gtk
class TestTrackObject(tests.util.TestCase):
"""Tests our track object."""
def setUp(self):
"""Set up common variables."""
super().setUp()
self.table = Gio.ListStore()
self.table.sql = self.sql
self.table.update = unittest.mock.Mock()
self.library = self.sql.libraries.create(pathlib.Path("/a/b"))
self.album = self.sql.albums.create("Test Album", "Test Artist",
release="1988-06")
self.medium = self.sql.media.create(self.album, "", number=1)
self.year = self.sql.years.create(1988)
self.track = emmental.db.tracks.Track(trackid=12345, table=self.table,
libraryid=self.library.libraryid,
mediumid=self.medium.mediumid,
year=self.year.year,
path=pathlib.Path("/a/b/c.ogg"))
def test_constants(self):
"""Test constant values."""
self.assertEqual(emmental.db.tracks.PLAYED_THRESHOLD, (2 / 3))
def test_init(self):
"""Test that the Track is set up properly."""
self.assertIsInstance(self.track, emmental.db.table.Row)
self.assertEqual(self.track.table, self.table)
self.assertEqual(self.track.trackid, 12345)
self.assertEqual(self.track.primary_key, 12345)
self.assertEqual(self.track.libraryid, self.library.libraryid)
self.assertEqual(self.track.mediumid, self.medium.mediumid)
self.assertEqual(self.track.year, self.year.year)
self.assertFalse(self.track.active)
self.assertFalse(self.track.favorite)
self.assertEqual(self.track.path, pathlib.Path("/a/b/c.ogg"))
self.assertEqual(self.track.mbid, "")
self.assertEqual(self.track.title, "")
self.assertEqual(self.track.artist, "")
self.assertEqual(self.track.number, 0)
self.assertEqual(self.track.length, 0.0)
self.assertEqual(self.track.mtime, 0.0)
self.assertEqual(self.track.playcount, 0)
self.assertIsNone(self.track.added)
self.assertIsNone(self.track.laststarted)
self.assertIsNone(self.track.lastplayed)
self.assertIsNone(self.track.restarted)
def test_get_library(self):
"""Test getting the Library associated with a Track."""
self.assertEqual(self.track.get_library(), self.library)
def test_get_medium(self):
"""Test getting a Medium playlist."""
self.assertEqual(self.track.get_medium(), self.medium)
def test_get_year(self):
"""Test getting a Year playlist."""
self.assertEqual(self.track.get_year(), self.year)
def test_restart(self):
"""Test the Track.restart() function."""
self.table.restart_track = unittest.mock.Mock()
self.track.restart()
self.table.restart_track.assert_called_with(self.track)
def test_start(self):
"""Test the Track.start() function."""
self.table.start_track = unittest.mock.Mock()
self.track.start()
self.table.start_track.assert_called_with(self.track)
def test_stop(self):
"""Test the Track.stop() function."""
self.table.stop_track = unittest.mock.Mock()
self.track.length = 3
self.track.stop(0)
self.table.stop_track.assert_called_with(self.track, False)
self.track.stop(2.5)
self.table.stop_track.assert_called_with(self.track, True)
def test_update_properties(self):
"""Test updating track properties."""
now = datetime.datetime.now()
self.track.update_properties(trackid=1, libraryid=1, active=True,
path=pathlib.Path("/a/b/c.ogg"),
playcount=1, laststarted=now,
lastplayed=now, restarted=now)
self.table.update.assert_not_called()
self.track.update_properties(mediumid=2, favorite=True, year=1985,
mbid="ab-cd-ef", title="New Title",
artist="New Artist", number=2,
length=12.345, mtime=67.890)
self.table.update.assert_has_calls(
[unittest.mock.call(self.track, "mediumid", 2),
unittest.mock.call(self.track, "favorite", True),
unittest.mock.call(self.track, "year", 1985),
unittest.mock.call(self.track, "mbid", "ab-cd-ef"),
unittest.mock.call(self.track, "title", "New Title"),
unittest.mock.call(self.track, "artist", "New Artist"),
unittest.mock.call(self.track, "number", 2),
unittest.mock.call(self.track, "length", 12.345),
unittest.mock.call(self.track, "mtime", 67.890)])
self.table.update.reset_mock()
self.track.update_properties(mediumid=2, favorite=True, year=1985,
mbid="ab-cd-ef", title="New Title",
artist="New Artist", number=2,
length=12.345, mtime=67.890)
self.table.update.assert_not_called()
class TestTrackTable(tests.util.TestCase):
"""Tests our track table."""
def setUp(self):
"""Set up common variables."""
super().setUp()
self.sql.playlists.load(now=True)
self.sql.playlists.favorites.add_track = unittest.mock.Mock()
self.sql.playlists.favorites.remove_track = unittest.mock.Mock()
self.sql.playlists.most_played.reload_tracks = unittest.mock.Mock()
self.sql.playlists.previous.add_track = unittest.mock.Mock()
self.sql.playlists.previous.remove_track = unittest.mock.Mock()
self.sql.playlists.queued.remove_track = unittest.mock.Mock()
self.sql.playlists.unplayed.remove_track = unittest.mock.Mock()
self.playlists = self.sql.playlists
self.library = self.sql.libraries.create(pathlib.Path("/a/b/"))
self.album = self.sql.albums.create("Test Album", "Album Artist",
release="2022-10")
self.medium = self.sql.media.create(self.album, "Test Medium",
number=1)
self.year = self.sql.years.create(1988)
self.tracks = self.sql.tracks
def test_track_filter(self):
"""Test the tracks.Filter object."""
filter = emmental.db.tracks.Filter()
self.assertEqual(filter.get_strictness(), Gtk.FilterMatch.SOME)
filter.keys = {1, 2, 3}
self.assertEqual(filter.get_strictness(), Gtk.FilterMatch.SOME)
filter.keys = set()
self.assertEqual(filter.get_strictness(), Gtk.FilterMatch.NONE)
def test_init(self):
"""Test that the Track table is initialized properly."""
self.assertIsInstance(self.tracks, emmental.db.table.Table)
self.assertIsInstance(self.tracks.get_filter(),
emmental.db.tracks.Filter)
self.assertIsNone(self.tracks.get_model())
def test_construct(self):
"""Test constructing a new Track."""
now = datetime.datetime.now()
track = self.tracks.construct(trackid=1, year=1988,
libraryid=self.library.libraryid,
mediumid=self.medium.mediumid,
path=pathlib.Path("/a/b/c.ogg"),
mbid="ab-cd-ef", title="Title", number=1,
length=1.0, artist="Artist", mtime=1.0,
playcount=1, lastplayed=now)
self.assertIsInstance(track, emmental.db.tracks.Track)
self.assertEqual(track.table, self.tracks)
self.assertEqual(track.trackid, 1)
self.assertEqual(track.libraryid, self.library.libraryid)
self.assertEqual(track.mediumid, self.medium.mediumid)
self.assertEqual(track.year, 1988)
self.assertEqual(track.path, pathlib.Path("/a/b/c.ogg"))
self.assertEqual(track.mbid, "ab-cd-ef")
self.assertEqual(track.title, "Title")
self.assertEqual(track.number, 1)
self.assertEqual(track.length, 1.0)
self.assertEqual(track.artist, "Artist")
self.assertEqual(track.mtime, 1.0)
self.assertEqual(track.playcount, 1)
self.assertEqual(track.lastplayed, now)
self.assertFalse(track.active)
self.assertFalse(track.favorite)
self.assertIsNone(self.tracks.current_track)
def test_create(self):
"""Test creating a new Track."""
track = self.tracks.create(self.library, pathlib.Path("/a/b/c.ogg"),
self.medium, self.year)
self.assertIsInstance(track, emmental.db.tracks.Track)
self.assertEqual(track.libraryid, self.library.libraryid)
self.assertEqual(track.mediumid, self.medium.mediumid)
self.assertEqual(track.year, 1988)
self.assertEqual(track.path, pathlib.Path("/a/b/c.ogg"))
self.assertEqual(track.added, datetime.datetime.utcnow().date())
track2 = self.tracks.create(self.library, pathlib.Path("/a/b/d.ogg"),
self.medium, self.year, title="Test Track",
number=1, length=1.23, artist="Artist",
mbid="ab-cd-ef", mtime=4.56)
self.assertEqual(track2.trackid, 2)
self.assertEqual(track2.libraryid, self.library.libraryid)
self.assertEqual(track2.mediumid, self.medium.mediumid)
self.assertEqual(track2.path, pathlib.Path("/a/b/d.ogg"))
self.assertEqual(track2.title, "Test Track")
self.assertEqual(track2.number, 1)
self.assertEqual(track2.length, 1.23)
self.assertEqual(track2.artist, "Artist")
self.assertEqual(track2.mbid, "ab-cd-ef")
self.assertEqual(track2.mtime, 4.56)
track3 = self.tracks.create(self.library, pathlib.Path("/a/b/c.ogg"),
self.medium, self.year)
self.assertIsNone(track3)
cur = self.sql("SELECT COUNT(*) FROM tracks")
self.assertEqual(cur.fetchone()["COUNT(*)"], 2)
def test_delete(self):
"""Test deleting a Track."""
track = self.tracks.create(self.library, pathlib.Path("/a/b/c.ogg"),
self.medium, self.year)
self.assertTrue(track.delete())
self.assertIsNone(self.tracks.index(track))
cur = self.sql("SELECT COUNT(path) FROM tracks")
self.assertEqual(cur.fetchone()["COUNT(path)"], 0)
self.assertEqual(len(self.tracks), 0)
self.assertFalse(track.delete())
def test_filter(self):
"""Test filtering the Track table."""
self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year,
title="Title 1", artist="Test Artist")
self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
self.medium, self.year,
title="Title 2", artist="Test Artist")
self.tracks.filter("*1", now=True)
self.assertSetEqual(self.tracks.get_filter().keys, {1})
self.tracks.filter("*artist", now=True)
self.assertSetEqual(self.tracks.get_filter().keys, {1, 2})
self.tracks.filter("*medium", now=True)
self.assertSetEqual(self.tracks.get_filter().keys, {1, 2})
self.tracks.filter("*album", now=True)
self.assertSetEqual(self.tracks.get_filter().keys, {1, 2})
self.tracks.filter("*album artist", now=True)
self.assertSetEqual(self.tracks.get_filter().keys, {1, 2})
self.tracks.filter("2022-*", now=True)
self.assertSetEqual(self.tracks.get_filter().keys, {1, 2})
def test_load(self):
"""Test loading tracks from the database."""
now = datetime.datetime.now()
self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year)
self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
self.medium, self.year, title="Track 2",
number=2, length=2, artist="Test Artist",
mbid="ab-cd-ef", mtime=123.45)
self.sql("""UPDATE tracks SET active=TRUE, favorite=TRUE, playcount=3,
lastplayed=?, laststarted=? WHERE trackid=2""", now, now)
table2 = emmental.db.tracks.Table(sql=self.sql)
self.assertEqual(len(table2), 0)
table2.load(now=True)
self.assertEqual(len(table2.store), 2)
self.assertEqual(table2.current_track, table2.store[1])
for i in [0, 1]:
with self.subTest(i=i):
self.assertEqual(table2.store[i].trackid, i + 1)
self.assertEqual(table2.store[i].libraryid,
self.library.libraryid)
self.assertEqual(table2.store[i].mediumid,
self.medium.mediumid)
self.assertEqual(table2.store[i].year, self.year.year)
self.assertEqual(table2.store[i].active, bool(i))
self.assertEqual(table2.store[i].favorite, bool(i))
self.assertEqual(table2.store[i].path,
pathlib.Path(f"/a/b/{i+1}.ogg"))
self.assertEqual(table2.store[i].mbid, "ab-cd-ef" if i else "")
self.assertEqual(table2.store[i].title, "Track 2" if i else "")
self.assertEqual(table2.store[i].artist,
"Test Artist" if i else "")
self.assertEqual(table2.store[i].number, 2 if i else 0)
self.assertEqual(table2.store[i].length, 2 if i else 0)
self.assertEqual(table2.store[i].mtime, 123.45 if i else 0)
self.assertEqual(table2.store[i].playcount, 3 if i else 0)
self.assertEqual(table2.store[i].laststarted,
now if i else None)
self.assertEqual(table2.store[i].lastplayed,
now if i else None)
self.assertIsNone(table2.store[i].restarted)
def test_lookup(self):
"""Test looking up tracks in the database."""
path1 = pathlib.Path("/a/b/1.ogg")
path2 = pathlib.Path("/a/b/2.ogg")
track1 = self.tracks.create(self.library, path1,
self.medium, self.year)
track2 = self.tracks.create(self.library, path2,
self.medium, self.year, mbid="ab-cd-ef")
library2 = self.sql.libraries.create(pathlib.Path("/a/b/d"))
self.assertEqual(self.tracks.lookup(self.library, path=path1), track1)
self.assertEqual(self.tracks.lookup(path=path1), track1)
self.assertEqual(self.tracks.lookup(path=path2), track2)
self.assertIsNone(self.tracks.lookup(path="/no/such/track"))
self.assertIsNone(self.tracks.lookup(library2, path=path1))
self.assertEqual(self.tracks.lookup(self.library, mbid="ab-cd-ef"),
track2)
self.assertEqual(self.tracks.lookup(mbid="ab-cd-ef"), track2)
self.assertIsNone(self.tracks.lookup(mbid="gh-ij-kl"))
with self.assertRaises(KeyError) as error:
self.tracks.lookup(self.library)
self.assertEqual(error.value,
"Either 'path' or 'mbid' are required")
def test_map_sort_order(self):
"""Test getting a lookup table for Track sort keys."""
tracks = [self.tracks.create(self.library,
pathlib.Path(f"/a/b/{n}.ogg"),
self.medium, self.year, number=n)
for n in range(10)]
self.assertDictEqual(self.tracks.map_sort_order(""),
{t.trackid: t.trackid - 1 for t in tracks})
self.assertDictEqual(self.tracks.map_sort_order("number DESC"),
{t.trackid: 10 - t.trackid for t in tracks})
def test_update(self):
"""Test updating tracks in the database."""
track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year, length=10)
medium2 = self.sql.media.create(self.album, "", number=2)
y2022 = self.sql.years.create(2022)
track.update_properties(mediumid=medium2.mediumid, year=y2022.year,
favorite=True, mbid="ab-cd-ef",
title="New Title", artist="New Artist",
number=1, length=42, mtime=123.45)
self.playlists.favorites.add_track.assert_called_with(track)
cur = self.sql("""SELECT mediumid, year, favorite, mbid, title,
artist, number, length, mtime
FROM tracks WHERE trackid = ?""", track.trackid)
row = cur.fetchone()
self.assertEqual(row["mediumid"], medium2.mediumid)
self.assertEqual(row["year"], 2022)
self.assertTrue(row["favorite"])
self.assertEqual(row["mbid"], "ab-cd-ef")
self.assertEqual(row["title"], "New Title")
self.assertEqual(row["artist"], "New Artist")
self.assertEqual(row["number"], 1)
self.assertEqual(row["length"], 42)
self.assertEqual(row["mtime"], 123.45)
track.update_properties(favorite=False)
self.playlists.favorites.remove_track.assert_called_with(track)
track2 = self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
self.medium, self.year, length=10)
track2.active = True
row = self.sql("SELECT active FROM tracks WHERE trackid=?",
track.trackid).fetchone()
self.assertFalse(row["active"])
def test_mark_path_active(self):
"""Test marking a path as active."""
self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year)
self.tracks.mark_path_active(pathlib.Path("/a/b/1.ogg"))
cur = self.sql("SELECT trackid FROM tracks WHERE active=TRUE")
self.assertEqual(cur.fetchone()["trackid"], 1)
self.tracks.mark_path_active(pathlib.Path("/a/b/4.ogg"))
cur = self.sql("SELECT trackid FROM tracks WHERE active=TRUE")
self.assertIsNone(cur.fetchone())
def test_restart_track(self):
"""Test marking that a Track has restarted."""
track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year)
track.restart()
self.assertTrue(track.active)
self.assertGreater(datetime.datetime.utcnow(), track.restarted)
self.assertEqual(self.tracks.current_track, track)
self.playlists.previous.remove_track.assert_not_called()
def test_start_track(self):
"""Test marking that a Track has started playback."""
track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year)
track.start()
row = self.sql("SELECT laststarted FROM tracks WHERE trackid=?",
track.trackid).fetchone()
self.assertTrue(track.active)
self.assertIsNotNone(track.laststarted)
self.assertEqual(track.laststarted, row["laststarted"])
self.assertEqual(self.tracks.current_track, track)
self.playlists.previous.remove_track.assert_called_with(track)
self.playlists.previous.add_track.assert_called_with(track)
def test_stop_started_track(self):
"""Test marking that a Track has stopped playback."""
track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year, length=10)
track.start()
track.stop(3)
row = self.sql("SELECT lastplayed FROM tracks WHERE trackid=?",
track.trackid).fetchone()
self.assertFalse(track.active)
self.assertEqual(track.playcount, 0)
self.assertIsNone(row["lastplayed"])
self.assertIsNone(track.lastplayed)
self.assertIsNone(self.tracks.current_track)
self.playlists.most_played.reload_tracks.assert_not_called()
self.playlists.queued.remove_track.assert_not_called()
self.playlists.unplayed.remove_track.assert_not_called()
track.start()
track.stop(8)
row = self.sql("""SELECT lastplayed, playcount FROM tracks
WHERE trackid=?""", track.trackid).fetchone()
self.assertEqual(row["playcount"], 1)
self.assertEqual(track.playcount, 1)
self.assertEqual(row["lastplayed"], track.laststarted)
self.assertEqual(track.lastplayed, track.laststarted)
self.playlists.most_played.reload_tracks.assert_called()
self.playlists.queued.remove_track.assert_called_with(track)
self.playlists.unplayed.remove_track.assert_called_with(track)
def test_stop_restarted_track(self):
"""Test marking that a restarted Track has stopped playback."""
track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year, length=10)
track.restart()
track.stop(3)
row = self.sql("""SELECT lastplayed, laststarted FROM tracks
WHERE trackid=?""", track.trackid).fetchone()
self.assertFalse(track.active)
self.assertEqual(track.playcount, 0)
self.assertIsNone(row["lastplayed"])
self.assertIsNone(track.lastplayed)
self.assertIsNone(row["laststarted"])
self.assertIsNone(track.laststarted)
self.assertIsNone(track.restarted)
self.assertIsNone(self.tracks.current_track)
self.playlists.most_played.reload_tracks.assert_not_called()
self.playlists.queued.remove_track.assert_not_called()
self.playlists.unplayed.remove_track.assert_not_called()
track.restart()
restarted = track.restarted
track.stop(8)
row = self.sql("""SELECT lastplayed, laststarted, playcount FROM tracks
WHERE trackid=?""", track.trackid).fetchone()
self.assertEqual(row["playcount"], 1)
self.assertEqual(track.playcount, 1)
self.assertEqual(row["lastplayed"], restarted)
self.assertEqual(track.lastplayed, restarted)
self.assertEqual(row["laststarted"], restarted)
self.assertEqual(track.laststarted, restarted)
self.playlists.most_played.reload_tracks.assert_called_with(idle=True)
self.playlists.queued.remove_track.assert_called_with(track)
self.playlists.unplayed.remove_track.assert_called_with(track)
def test_current_track(self):
"""Test the current-track and have-current-track properties."""
self.assertIsNone(self.tracks.current_track)
self.assertFalse(self.tracks.have_current_track)
track = self.tracks.construct(trackid=2, active=True)
self.assertEqual(self.tracks.current_track, track)
self.assertTrue(self.tracks.have_current_track)
self.playlists.previous.add_track.assert_called_with(track)
track2 = self.tracks.construct(trackid=2, active=True, favorite=True)
self.assertEqual(self.tracks.current_track, track2)
self.assertTrue(self.tracks.have_current_track)
self.playlists.previous.add_track.assert_called_with(track2)
self.playlists.previous.add_track.reset_mock()
self.tracks.current_track = None
self.assertFalse(self.tracks.have_current_track)
self.playlists.previous.add_track.assert_not_called()