# Copyright 2024 (c) Anna Schumaker. """Our ListenBrainz custom GObject.""" from gi.repository import GObject from gi.repository import GLib from .. import db from . import listen from . import thread from . import task class ListenBrainz(GObject.GObject): """Our main ListenBrainz GObject.""" sql = GObject.Property(type=db.Connection) user_token = GObject.Property(type=str) valid_token = GObject.Property(type=bool, default=True) now_playing = GObject.Property(type=db.tracks.Track) def __init__(self, sql: db.Connection): """Initialize the ListenBrainz GObject.""" super().__init__(sql=sql) self._queue = task.Queue() self._thread = thread.Thread() self._idle_id = None self.connect("notify::user-token", self.__notify_user_token) self.connect("notify::now-playing", self.__notify_now_playing) def __check_connected(self) -> bool: return len(self.user_token) and self.valid_token def __check_result(self) -> None: if (res := self._thread.get_result()) is not None: self.valid_token = res.valid if res.op == "submit-listens" and self.valid_token: listens = [lsn.listenid for lsn in res.listens] self.sql.tracks.delete_listens(listens) def __parse_task(self, op: str, *args) -> bool: match op: case "clear-token": self._thread.clear_user_token() case "now-playing": self._thread.submit_now_playing(listen.Listen(*args)) case "set-token": self._thread.set_user_token(*args) case "submit-listens": listens = self.sql.tracks.get_n_listens(50) if len(listens) == 0: self._idle_id = None return GLib.SOURCE_REMOVE self._thread.submit_listens([listen.Listen(trk, listenid=id, listened_at=ts) for (id, trk, ts) in listens]) return GLib.SOURCE_CONTINUE def __idle_work(self) -> bool: if self._thread.ready.is_set(): self.__check_result() return self.__parse_task(*self._queue.pop()) return GLib.SOURCE_CONTINUE def __idle_start(self) -> None: if self._idle_id is None: self._idle_id = GLib.idle_add(self.__idle_work) def __notify_user_token(self, listenbrainz: GObject.GObject, param: GObject.ParamSpec) -> None: match self.user_token: case "": self._queue.push("clear-token") case _: self._queue.push("set-token", self.user_token) self.__idle_start() def __notify_now_playing(self, listenbrainz: GObject.GObject, param: GObject.ParamSpec) -> None: if self.now_playing is not None: self._queue.push("now-playing", self.now_playing) if self.__check_connected(): self.__idle_start() else: self._queue.clear("now-playing") def __source_stop(self, srcid: str) -> None: if (id := getattr(self, srcid)) is not None: GLib.source_remove(id) setattr(self, srcid, None) def stop(self) -> None: """Stop the ListenBrainz thread.""" self.__source_stop("_idle_id") self._thread.stop() def submit_listens(self, *args) -> None: """Submit recent listens to ListenBrainz.""" if self.__check_connected(): self.__idle_start()