listenbrainz: Add an initial ListenBrainz object

I created the foundation of what I'll need for working with
ListenBrainz. This includes a ListenBrainz GObject, a threading.Thread
implementation that will do the actual work for communicating with
ListenBrainz, and what will become a priority queue to make sure we do
certain operations (such as setting the user's auth token) first.

These are mostly placeholder classes right now, and future patches will
expand on what they can do.

Implements: #69 ("Add ListenBrainz support")
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2024-01-29 14:06:15 -05:00
parent efe2611422
commit 14c153733d
7 changed files with 99 additions and 0 deletions

View File

@ -20,6 +20,7 @@ other playlists run out of tracks.
* Python3
* dateutil
* gobject
* liblistenbrainz
* musicbrainzngs
* mutagen
* pyxdg

View File

@ -0,0 +1,19 @@
# Copyright 2024 (c) Anna Schumaker.
"""Our ListenBrainz custom GObject."""
from gi.repository import GObject
from . import thread
from . import task
class ListenBrainz(GObject.GObject):
"""Our main ListenBrainz GObject."""
def __init__(self):
"""Initialize the ListenBrainz GObject."""
super().__init__()
self._queue = task.Queue()
self._thread = thread.Thread()
def stop(self) -> None:
"""Stop the ListenBrainz thread."""
self._thread.stop()

View File

@ -0,0 +1,6 @@
# Copyright 2024 (c) Anna Schumaker.
"""Our ListenBrainz operation priority queue."""
class Queue:
"""A queue for prioritizing ListenBrainz operations."""

View File

@ -0,0 +1,7 @@
# Copyright 2024 (c) Anna Schumaker.
"""Our ListenBrainz client thread."""
from .. import thread
class Thread(thread.Thread):
"""Thread for submitting listens to ListenBrainz."""

View File

@ -0,0 +1,30 @@
# Copyright 2024 (c) Anna Schumaker.
"""Tests our custom ListenBrainz GObject."""
import emmental.listenbrainz
import unittest
from gi.repository import GObject
class TestListenBrainz(unittest.TestCase):
"""ListenBrainz GObject test case."""
def setUp(self):
"""Set up common variables."""
self.listenbrainz = emmental.listenbrainz.ListenBrainz()
def tearDown(self):
"""Clean up."""
self.listenbrainz.stop()
def test_init(self):
"""Test that the ListenBrainz GObject was set up properly."""
self.assertIsInstance(self.listenbrainz, GObject.GObject)
self.assertIsInstance(self.listenbrainz._queue,
emmental.listenbrainz.task.Queue)
self.assertIsInstance(self.listenbrainz._thread,
emmental.listenbrainz.thread.Thread)
def test_stop(self):
"""Test stopping the thread during shutdown."""
self.listenbrainz.stop()
self.assertFalse(self.listenbrainz._thread.is_alive())

View File

@ -0,0 +1,16 @@
# Copyright 2024 (c) Anna Schumaker.
"""Tests our ListenBrainz priority queue."""
import emmental.listenbrainz.task
import unittest
class TestTaskQueue(unittest.TestCase):
"""Test the ListenBrainz queue."""
def setUp(self):
"""Set up common variables."""
self.queue = emmental.listenbrainz.task.Queue()
def test_init(self):
"""Test that the queue was set up properly."""
self.assertIsNotNone(self.queue)

View File

@ -0,0 +1,20 @@
# Copyright 2024 (c) Anna Schumaker.
"""Tests our ListenBrainz client thread."""
import emmental.listenbrainz.thread
import unittest
class TestThread(unittest.TestCase):
"""ListenBrainz Thread test case."""
def setUp(self):
"""Set up common variables."""
self.thread = emmental.listenbrainz.thread.Thread()
def tearDown(self):
"""Clean up."""
self.thread.stop()
def test_init(self):
"""Test that the ListenBrainz thread was initialized properly."""
self.assertIsInstance(self.thread, emmental.thread.Thread)