# Copyright 2024 (c) Anna Schumaker. """Our ListenBrainz client thread.""" import liblistenbrainz import requests from .. import thread class Thread(thread.Thread): """Thread for submitting listens to ListenBrainz.""" def __init__(self): """Initialize the ListenBrainz Thread object.""" super().__init__() self._client = liblistenbrainz.client.ListenBrainz() def __print(self, text: str) -> None: print(f"listenbrainz: {text}") def __set_user_token(self, token: str) -> None: try: self._client.set_auth_token(token) self.set_result("set-token", token=token) except liblistenbrainz.errors.InvalidAuthTokenException: self.set_result("set-token", token=token, valid=False) except requests.exceptions.ConnectionError: self.set_result("set-token", token=token, offline=True) def __submit_now_playing(self, listen: liblistenbrainz.Listen) -> None: try: self._client.submit_playing_now(listen) self.set_result("now-playing") except liblistenbrainz.errors.ListenBrainzAPIException: self.set_result("now-playing", valid=False) except requests.exceptions.ConnectionError: self.set_result("now-playing", offline=True) def __submit_listens(self, listens: list[liblistenbrainz.Listen]) -> None: try: if len(listens) == 1: self._client.submit_single_listen(listens[0]) else: self._client.submit_multiple_listens(listens) self.set_result("submit-listens", listens=listens) except liblistenbrainz.errors.ListenBrainzAPIException: self.set_result("submit-listens", listens=listens, valid=False) except requests.exceptions.ConnectionError: self.set_result("submit-listens", listens=listens, offline=True) def do_run_task(self, task: thread.Data) -> None: """Call a specific listenbrainz operation.""" match task.op: case "clear-token": self._client.set_auth_token(None, check_validity=False) self.set_result("clear-token") case "now-playing": self.__submit_now_playing(task.listen) case "set-token": self.__set_user_token(task.token) case "submit-listens": self.__submit_listens(task.listens) def clear_user_token(self) -> None: """Schedule clearing the user token.""" self.__print("clearing user token") self.set_task(op="clear-token") def get_result(self, **kwargs) -> thread.Data: """Get the result of a listenbrainz task.""" if (res := super().get_result(**kwargs)) is not None: if not res.valid: self.__print("user token is invalid") if res.offline: self.__print("offline") return res def set_result(self, op: str, *, valid: bool = True, offline: bool = False, **kwargs) -> None: """Set the Thread result with a standard format for all ops.""" super().set_result(op=op, valid=valid, offline=offline, **kwargs) def set_user_token(self, token: str) -> None: """Schedule setting the user token.""" self.__print("setting user token") self.set_task(op="set-token", token=token) def submit_now_playing(self, listen: liblistenbrainz.Listen) -> None: """Schedule setting the now-playing track.""" self.__print(f"now playing '{listen.track_name}' " + f"by '{listen.artist_name}'") self.set_task(op="now-playing", listen=listen) def submit_listens(self, listens: list[liblistenbrainz.Listen]) -> None: """Submit listens to listenbrainz.""" num = len(listens) self.__print(f"submitting {num} listen{'s' if num != 1 else ''}") self.set_task(op="submit-listens", listens=listens)