emmental/emmental/listenbrainz/__init__.py

65 lines
1.9 KiB
Python

# Copyright 2024 (c) Anna Schumaker.
"""Our ListenBrainz custom GObject."""
from gi.repository import GObject
from gi.repository import GLib
from . import thread
from . import task
class ListenBrainz(GObject.GObject):
"""Our main ListenBrainz GObject."""
user_token = GObject.Property(type=str)
valid_token = GObject.Property(type=bool, default=True)
def __init__(self):
"""Initialize the ListenBrainz GObject."""
super().__init__()
self._queue = task.Queue()
self._thread = thread.Thread()
self._idle_id = None
self.connect("notify::user-token", self.__notify_user_token)
def __check_result(self) -> None:
if (res := self._thread.get_result()) is not None:
self.valid_token = res.valid
def __parse_task(self, op: str, *args) -> bool:
match op:
case "set-token":
self._thread.set_user_token(*args)
return GLib.SOURCE_CONTINUE
def __idle_work(self) -> bool:
if self._thread.ready.is_set():
self.__check_result()
if (task := self._queue.pop()) is None:
self._idle_id = None
return GLib.SOURCE_REMOVE
return self.__parse_task(*task)
return GLib.SOURCE_CONTINUE
def __idle_start(self) -> None:
if self._idle_id is None:
self._idle_id = GLib.idle_add(self.__idle_work)
def __notify_user_token(self, listenbrainz: GObject.GObject,
param: GObject.ParamSpec) -> None:
self._queue.push("set-token", self.user_token)
self.__idle_start()
def __source_stop(self, srcid: str) -> None:
if (id := getattr(self, srcid)) is not None:
GLib.source_remove(id)
setattr(self, srcid, None)
def stop(self) -> None:
"""Stop the ListenBrainz thread."""
self.__source_stop("_idle_id")
self._thread.stop()