listenbrainz: Submit the currently playing track to ListenBrainz
I do this by creating a new Listen class that is constructed from one of our db.track.Tracks to convert to something liblistenbrainz understands. From there, I watch for changes to the "now-playing" property and call out to the Thread to submit the track to ListenBrainz. Implements: #69 ("Add ListenBrainz support") Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
parent
84a832389f
commit
4c5d3c78c0
|
@ -2,6 +2,8 @@
|
|||
"""Our ListenBrainz custom GObject."""
|
||||
from gi.repository import GObject
|
||||
from gi.repository import GLib
|
||||
from .. import db
|
||||
from . import listen
|
||||
from . import thread
|
||||
from . import task
|
||||
|
||||
|
@ -11,6 +13,7 @@ class ListenBrainz(GObject.GObject):
|
|||
|
||||
user_token = GObject.Property(type=str)
|
||||
valid_token = GObject.Property(type=bool, default=True)
|
||||
now_playing = GObject.Property(type=db.tracks.Track)
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the ListenBrainz GObject."""
|
||||
|
@ -21,6 +24,10 @@ class ListenBrainz(GObject.GObject):
|
|||
self._idle_id = None
|
||||
|
||||
self.connect("notify::user-token", self.__notify_user_token)
|
||||
self.connect("notify::now-playing", self.__notify_now_playing)
|
||||
|
||||
def __check_connected(self) -> bool:
|
||||
return len(self.user_token) and self.valid_token
|
||||
|
||||
def __check_result(self) -> None:
|
||||
if (res := self._thread.get_result()) is not None:
|
||||
|
@ -30,6 +37,8 @@ class ListenBrainz(GObject.GObject):
|
|||
match op:
|
||||
case "clear-token":
|
||||
self._thread.clear_user_token()
|
||||
case "now-playing":
|
||||
self._thread.submit_now_playing(listen.Listen(*args))
|
||||
case "set-token":
|
||||
self._thread.set_user_token(*args)
|
||||
|
||||
|
@ -57,6 +66,15 @@ class ListenBrainz(GObject.GObject):
|
|||
case _: self._queue.push("set-token", self.user_token)
|
||||
self.__idle_start()
|
||||
|
||||
def __notify_now_playing(self, listenbrainz: GObject.GObject,
|
||||
param: GObject.ParamSpec) -> None:
|
||||
if self.now_playing is not None:
|
||||
self._queue.push("now-playing", self.now_playing)
|
||||
if self.__check_connected():
|
||||
self.__idle_start()
|
||||
else:
|
||||
self._queue.clear("now-playing")
|
||||
|
||||
def __source_stop(self, srcid: str) -> None:
|
||||
if (id := getattr(self, srcid)) is not None:
|
||||
GLib.source_remove(id)
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
# Copyright 2024 (c) Anna Schumaker.
|
||||
"""Convert a db.track.Track to a liblistenbrainz.Listen."""
|
||||
import liblistenbrainz
|
||||
from .. import db
|
||||
from .. import gsetup
|
||||
|
||||
|
||||
class Listen(liblistenbrainz.Listen):
|
||||
"""A single ListenBrainz Listen."""
|
||||
|
||||
def __init__(self, track: db.tracks.Track):
|
||||
"""Initialize our Listen class."""
|
||||
album = track.get_medium().get_album()
|
||||
artists = [a.mbid for a in track.get_artists() if len(a.mbid) > 0]
|
||||
album_mbid = album.mbid if len(album.mbid) > 0 else None
|
||||
super().__init__(track.title, track.artist, release_name=album.name,
|
||||
artist_mbids=artists, release_group_mbid=album_mbid,
|
||||
tracknumber=track.number,
|
||||
additional_info={"media_player":
|
||||
f"emmental{gsetup.DEBUG_STR}"})
|
|
@ -8,17 +8,24 @@ class Queue:
|
|||
def __init__(self):
|
||||
"""Initialize the task Queue."""
|
||||
self._set_token = None
|
||||
self._now_playing = None
|
||||
|
||||
def clear(self, op: str) -> None:
|
||||
"""Clear a pending operation."""
|
||||
self._set_token = None
|
||||
match op:
|
||||
case "clear-token" | "set-token": self._set_token = None
|
||||
case "now-playing": self._now_playing = None
|
||||
|
||||
def push(self, op: str, *args) -> None:
|
||||
"""Push an operation onto the queue."""
|
||||
self._set_token = (op, *args)
|
||||
match op:
|
||||
case "clear-token" | "set-token": self._set_token = (op, *args)
|
||||
case "now-playing": self._now_playing = (op, *args)
|
||||
|
||||
def pop(self) -> tuple:
|
||||
"""Pop an operation off the queue."""
|
||||
res = self._set_token
|
||||
if (res := self._set_token) is not None:
|
||||
self._set_token = None
|
||||
elif (res := self._now_playing) is not None:
|
||||
self._now_playing = None
|
||||
return res
|
||||
|
|
|
@ -22,12 +22,21 @@ class Thread(thread.Thread):
|
|||
except liblistenbrainz.errors.InvalidAuthTokenException:
|
||||
self.set_result("set-token", token=token, valid=False)
|
||||
|
||||
def __submit_now_playing(self, listen: liblistenbrainz.Listen) -> None:
|
||||
try:
|
||||
self._client.submit_playing_now(listen)
|
||||
self.set_result("now-playing")
|
||||
except liblistenbrainz.errors.ListenBrainzAPIException:
|
||||
self.set_result("now-playing", valid=False)
|
||||
|
||||
def do_run_task(self, task: thread.Data) -> None:
|
||||
"""Call a specific listenbrainz operation."""
|
||||
match task.op:
|
||||
case "clear-token":
|
||||
self._client.set_auth_token(None, check_validity=False)
|
||||
self.set_result("clear-token")
|
||||
case "now-playing":
|
||||
self.__submit_now_playing(task.listen)
|
||||
case "set-token":
|
||||
self.__set_user_token(task.token)
|
||||
|
||||
|
@ -51,3 +60,9 @@ class Thread(thread.Thread):
|
|||
"""Schedule setting the user token."""
|
||||
self.__print("setting user token")
|
||||
self.set_task(op="set-token", token=token)
|
||||
|
||||
def submit_now_playing(self, listen: liblistenbrainz.Listen) -> None:
|
||||
"""Schedule setting the now-playing track."""
|
||||
self.__print(f"now playing '{listen.track_name}' " +
|
||||
f"by '{listen.artist_name}'")
|
||||
self.set_task(op="now-playing", listen=listen)
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
# Copyright 2024 (c) Anna Schumaker.
|
||||
"""Test creating a liblistenbrainz.Listen from a Track."""
|
||||
import emmental.listenbrainz.listen
|
||||
import liblistenbrainz
|
||||
import pathlib
|
||||
import tests.util
|
||||
|
||||
|
||||
class TestListen(tests.util.TestCase):
|
||||
"""ListenBrainz Listen test case."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up common variables."""
|
||||
super().setUp()
|
||||
self.library = self.sql.libraries.create(pathlib.Path("/a/b"))
|
||||
self.artists = [self.sql.artists.create("Artist 1", mbid="mbid-ar1"),
|
||||
self.sql.artists.create("Artist 2"),
|
||||
self.sql.artists.create("Artist 3", mbid="mbid-ar3")]
|
||||
self.album = self.sql.albums.create("Test Album", "Test Artist",
|
||||
release="1988-06",
|
||||
mbid="mbid-release")
|
||||
self.medium = self.sql.media.create(self.album, "", number=1)
|
||||
self.year = self.sql.years.create(1988)
|
||||
self.track = self.sql.tracks.create(self.library,
|
||||
pathlib.Path("/a/b/c.ogg"),
|
||||
self.medium, self.year,
|
||||
title="Track 1", number=1,
|
||||
artist="Track Artist")
|
||||
|
||||
for artist in self.artists:
|
||||
artist.add_track(self.track)
|
||||
|
||||
self.listen = emmental.listenbrainz.listen.Listen(self.track)
|
||||
|
||||
def test_init(self):
|
||||
"""Test initializing our Listen instance."""
|
||||
self.assertIsInstance(self.listen, liblistenbrainz.Listen)
|
||||
self.assertEqual(self.listen.track_name, "Track 1")
|
||||
self.assertEqual(self.listen.artist_name, "Track Artist")
|
||||
self.assertEqual(self.listen.release_name, "Test Album")
|
||||
self.assertEqual(self.listen.release_group_mbid, "mbid-release")
|
||||
self.assertEqual(self.listen.tracknumber, 1)
|
||||
self.assertDictEqual(self.listen.additional_info,
|
||||
{"media_player": "emmental-debug"})
|
||||
self.assertListEqual(self.listen.artist_mbids,
|
||||
["mbid-ar1", "mbid-ar3"])
|
|
@ -2,6 +2,8 @@
|
|||
"""Tests our custom ListenBrainz GObject."""
|
||||
import emmental.listenbrainz
|
||||
import io
|
||||
import pathlib
|
||||
import tests.util
|
||||
import unittest
|
||||
from gi.repository import GObject
|
||||
from gi.repository import GLib
|
||||
|
@ -10,12 +12,24 @@ from gi.repository import GLib
|
|||
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
||||
@unittest.mock.patch("gi.repository.GLib.source_remove")
|
||||
@unittest.mock.patch("gi.repository.GLib.idle_add", return_value=42)
|
||||
class TestListenBrainz(unittest.TestCase):
|
||||
class TestListenBrainz(tests.util.TestCase):
|
||||
"""ListenBrainz GObject test case."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up common variables."""
|
||||
super().setUp()
|
||||
self.listenbrainz = emmental.listenbrainz.ListenBrainz()
|
||||
self.library = self.sql.libraries.create(pathlib.Path("/a/b"))
|
||||
self.album = self.sql.albums.create("Test Album", "Test Artist",
|
||||
release="1988-06",
|
||||
mbid="mbid-release")
|
||||
self.medium = self.sql.media.create(self.album, "", number=1)
|
||||
self.year = self.sql.years.create(1988)
|
||||
self.track = self.sql.tracks.create(self.library,
|
||||
pathlib.Path("/a/b/c.ogg"),
|
||||
self.medium, self.year,
|
||||
title="Track 1", number=1,
|
||||
artist="Track Artist")
|
||||
|
||||
@unittest.mock.patch("gi.repository.GLib.source_remove")
|
||||
def tearDown(self, mock_source_remove: unittest.mock.Mock):
|
||||
|
@ -108,3 +122,60 @@ class TestListenBrainz(unittest.TestCase):
|
|||
self.assertEqual(idle_work(), GLib.SOURCE_REMOVE)
|
||||
self.assertTrue(self.listenbrainz.valid_token)
|
||||
self.assertIsNone(self.listenbrainz._idle_id)
|
||||
|
||||
def test_submit_now_playing(self, mock_idle_add: unittest.mock.Mock,
|
||||
mock_source_remove: unittest.mock.Mock,
|
||||
mock_stdout: io.StringIO):
|
||||
"""Test setting the now-playing property."""
|
||||
self.assertIsNone(self.listenbrainz.now_playing)
|
||||
|
||||
self.listenbrainz.user_token = "abcde"
|
||||
self.listenbrainz.valid_token = True
|
||||
self.listenbrainz._queue.pop()
|
||||
self.listenbrainz._ListenBrainz__source_stop("_idle_id")
|
||||
|
||||
self.listenbrainz.now_playing = self.track
|
||||
self.assertTupleEqual(self.listenbrainz._queue._now_playing,
|
||||
("now-playing", self.track))
|
||||
self.assertEqual(self.listenbrainz._idle_id, 42)
|
||||
|
||||
idle_work = self.listenbrainz._ListenBrainz__idle_work
|
||||
with unittest.mock.patch.object(self.listenbrainz._thread,
|
||||
"submit_now_playing") as mock_playing:
|
||||
self.assertEqual(idle_work(), GLib.SOURCE_CONTINUE)
|
||||
mock_playing.assert_called()
|
||||
self.assertIsInstance(mock_playing.call_args.args[0],
|
||||
emmental.listenbrainz.listen.Listen)
|
||||
|
||||
for valid in [True, False]:
|
||||
with self.subTest(valid=valid):
|
||||
self.listenbrainz._thread.set_result(op="now-playing",
|
||||
valid=valid)
|
||||
self.assertEqual(idle_work(), GLib.SOURCE_REMOVE)
|
||||
self.assertEqual(self.listenbrainz.valid_token, valid)
|
||||
|
||||
def test_submit_now_playing_later(self, mock_idle_add: unittest.mock.Mock,
|
||||
mock_source_remove: unittest.mock.Mock,
|
||||
mock_stdout: io.StringIO):
|
||||
"""Test the now-playing property when ListenBrainz is disconnected."""
|
||||
self.assertIsNone(self.listenbrainz.now_playing)
|
||||
|
||||
self.listenbrainz.now_playing = self.track
|
||||
self.assertTupleEqual(self.listenbrainz._queue._now_playing,
|
||||
("now-playing", self.track))
|
||||
self.assertIsNone(self.listenbrainz._idle_id)
|
||||
|
||||
self.listenbrainz.user_token = "abcde"
|
||||
self.listenbrainz.valid_token = False
|
||||
self.listenbrainz._queue.pop()
|
||||
self.listenbrainz._ListenBrainz__source_stop("_idle_id")
|
||||
self.listenbrainz.now_playing = self.track
|
||||
self.assertTupleEqual(self.listenbrainz._queue._now_playing,
|
||||
("now-playing", self.track))
|
||||
self.assertIsNone(self.listenbrainz._idle_id)
|
||||
|
||||
self.listenbrainz.valid_token = True
|
||||
self.listenbrainz._queue._now_playing = "abcde"
|
||||
self.listenbrainz.now_playing = None
|
||||
self.assertIsNone(self.listenbrainz._queue._now_playing)
|
||||
self.assertIsNone(self.listenbrainz._idle_id)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# Copyright 2024 (c) Anna Schumaker.
|
||||
"""Tests our ListenBrainz priority queue."""
|
||||
import emmental.listenbrainz.task
|
||||
import liblistenbrainz
|
||||
import unittest
|
||||
|
||||
|
||||
|
@ -41,3 +42,20 @@ class TestTaskQueue(unittest.TestCase):
|
|||
self.queue.push("clear-token")
|
||||
self.queue.clear("clear-token")
|
||||
self.assertIsNone(self.queue._set_token)
|
||||
|
||||
def test_push_now_playing(self):
|
||||
"""Test the push_now_playing() function."""
|
||||
self.assertIsNone(self.queue._now_playing)
|
||||
|
||||
listen = liblistenbrainz.Listen("Track Name", "Artist Name")
|
||||
self.queue.push("now-playing", listen)
|
||||
self.assertTupleEqual(self.queue._now_playing, ("now-playing", listen))
|
||||
|
||||
self.queue.push("set-token", "abcde")
|
||||
self.assertTupleEqual(self.queue.pop(), ("set-token", "abcde"))
|
||||
self.assertTupleEqual(self.queue.pop(), ("now-playing", listen))
|
||||
self.assertIsNone(self.queue._now_playing)
|
||||
|
||||
self.queue.push("now-playing", listen)
|
||||
self.queue.clear("now-playing")
|
||||
self.assertIsNone(self.queue._now_playing)
|
||||
|
|
|
@ -70,3 +70,37 @@ class TestThread(unittest.TestCase):
|
|||
self.assertEqual(mock_stdout.getvalue(),
|
||||
"listenbrainz: setting user token\n" +
|
||||
"listenbrainz: user token is invalid\n")
|
||||
|
||||
def test_submit_now_playing(self, mock_stdout: io.StringIO):
|
||||
"""Test submitting the now playing track."""
|
||||
listen = liblistenbrainz.Listen("Track Name", "Artist Name")
|
||||
with unittest.mock.patch.object(self.thread._client,
|
||||
"submit_playing_now") as mock_submit:
|
||||
self.thread.submit_now_playing(listen)
|
||||
self.assertFalse(self.thread.ready.is_set())
|
||||
self.assertEqual(self.thread._task, {"op": "now-playing",
|
||||
"listen": listen})
|
||||
self.assertEqual(mock_stdout.getvalue(),
|
||||
"listenbrainz: now playing 'Track Name' " +
|
||||
"by 'Artist Name'\n")
|
||||
|
||||
self.thread.ready.wait()
|
||||
mock_submit.assert_called_with(listen)
|
||||
self.assertEqual(self.thread.get_result(),
|
||||
{"op": "now-playing", "valid": True})
|
||||
|
||||
def test_submit_now_playing_exceptions(self, mock_stdout: io.StringIO):
|
||||
"""Test exception handling when submitting the now playing track."""
|
||||
listen = liblistenbrainz.Listen("Track Name", "Artist Name")
|
||||
with unittest.mock.patch.object(self.thread._client,
|
||||
"submit_playing_now") as mock_submit:
|
||||
mock_submit.side_effect = \
|
||||
liblistenbrainz.errors.ListenBrainzAPIException(401)
|
||||
self.thread.submit_now_playing(listen)
|
||||
self.thread.ready.wait()
|
||||
self.assertEqual(self.thread.get_result(),
|
||||
{"op": "now-playing", "valid": False})
|
||||
self.assertEqual(mock_stdout.getvalue(),
|
||||
"listenbrainz: now playing 'Track Name' " +
|
||||
"by 'Artist Name'\n" +
|
||||
"listenbrainz: user token is invalid\n")
|
||||
|
|
Loading…
Reference in New Issue