Compare commits

...

26 Commits

Author SHA1 Message Date
Anna Schumaker dbc60e1c5f emmental: Add the ListenBrainz GObject to the Application
And wire it up.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-20 13:35:18 -04:00
Anna Schumaker 6779535cf1 listenbrainz: Early startup handling
We can't craft a Listen object from a Track before the database is
almost entirely loaded, and attempting to do so will cause the
ListenBrainz thread to crash. So let's just defer any ListenBrainz
operations until the database has marked itself as "loaded" so we know
everything will work.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-20 13:35:18 -04:00
Anna Schumaker 2ae5fd0969 listenbrainz: Offline handling
If we get a connection error from any listenbrainz operation, then we need
to set up an occasional timer to retry connecting to listenbrainz to see
if the connection has been restored.

Implements: #69 ("Add ListenBrainz support")
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-20 13:35:18 -04:00
Anna Schumaker b1490fd447 listenbrainz: Submit Listens to ListenBrainz
I query the database for up to 50 tracks to submit at once. If there is
only one track to submit then I use the submit_single_listen() function
as intended by ListenBrainz.

Implements: #69 ("Add ListenBrainz support")
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-20 13:35:18 -04:00
Anna Schumaker 4c5d3c78c0 listenbrainz: Submit the currently playing track to ListenBrainz
I do this by creating a new Listen class that is constructed from one of
our db.track.Tracks to convert to something liblistenbrainz understands.
From there, I watch for changes to the "now-playing" property and call
out to the Thread to submit the track to ListenBrainz.

Implements: #69 ("Add ListenBrainz support")
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-20 13:35:18 -04:00
Anna Schumaker 84a832389f listenbrainz: Clear the user API token
If the user clears out their token in the settings UI, then we need to
clear it out in our listenbrainz client object as well.

Implements: #69 ("Add ListenBrainz support")
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-20 13:35:18 -04:00
Anna Schumaker 924f65fddd listenbrainz: Set the user API token
I added a "user-token" property to the ListenBrainz object, and watch for
changes to query the liblistenbrainz client. I also set up an idle
callback so we can set the "valid-token" property so the user can know
if there is a problem.

Implements: #69 ("Add ListenBrainz support")
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-20 13:35:15 -04:00
Anna Schumaker 14c153733d listenbrainz: Add an initial ListenBrainz object
I created the foundation of what I'll need for working with
ListenBrainz. This includes a ListenBrainz GObject, a threading.Thread
implementation that will do the actual work for communicating with
ListenBrainz, and what will become a priority queue to make sure we do
certain operations (such as setting the user's auth token) first.

These are mostly placeholder classes right now, and future patches will
expand on what they can do.

Implements: #69 ("Add ListenBrainz support")
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-20 13:35:13 -04:00
Anna Schumaker efe2611422 header: Add a PasswordEntry for inputting the ListenBrainz token
The user can fill this out to connect to their listenbrainz account and
submit listens. I add a listenbrainz logo icon based on their icon from
the website. I also create a symbolic version that I end up using in the
popover menu.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-18 11:17:31 -04:00
Anna Schumaker c49a23b046 header: Create a Settings Adw.ActionRow
This contains all the steps needed to open the settings editor window. I
move it into the "Menu" popover list since it's not a common action, so
it can be hidden from the main UI.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-18 11:17:31 -04:00
Anna Schumaker a944af7f3e header: Convert the Open button to an Adw.ActionRow
And put it in the new Menu button popover list. I don't expect this to
be a common action, so the extra button press is acceptable.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-18 11:17:31 -04:00
Anna Schumaker c5867badae header: Add a menu button
This will be expanded to contain the open file button, the settings
dialog, and eventually a listenbrainz configuration option.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-18 11:17:31 -04:00
Anna Schumaker e85bdcc7f4 header: Rename volume header objects
I'm going to add a second popover button to the header, so I need to
clarify which button these objects are being attached to.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-18 11:17:31 -04:00
Anna Schumaker c7dca6164e db: Create a listenbrainz_queue table in the database
I bump the user_version to 3 at the same time. This table will be used
to hold listenbrainz listens that have not yet been submitted to the
listenbrainz server. I also give the Track table functions to get and
delete listens from this table as needed by the listenbrainz thread.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-18 11:17:30 -04:00
Anna Schumaker eada937b7a db: Give the tracks table a track-played signal
I'm going to need this in ListenBrainz so we can submit the played
Track.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-13 11:30:47 -04:00
Anna Schumaker d373c33283 db: Convert the tagger to the new Thread class
This lets us do a lot of the basic Thread operations through common
code, allowing us to focus on tagging in this file instead of basic
Thread controls.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-13 11:30:47 -04:00
Anna Schumaker 1db187dba5 thread: Create a reusable Thread class
I found that I'm rewriting some of the same features every time I need
to spin up a Thread for something. This is a reusable Thread that can be
inherited for specific work.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-13 11:30:47 -04:00
Anna Schumaker 0d100ec752 thread: Create a generic Data class
This class is desigend to make it easier to pass data to and from a
running Thread. This was inspired by the types.SimpleNamespace object so
we can set generic values and use them as class members.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-13 11:30:47 -04:00
Anna Schumaker c4e827bc5a sidebar: Use the new database.loaded property
Rather than doing the work ourselves to calculate if the database has
been loaded, use the new property to notify us.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-13 11:29:54 -04:00
Anna Schumaker a4e0968ef4 db: Give the database a 'loaded' property
This can be checked or connected to so other parts of the application
can easily know if all database tables have been loaded or not.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-13 11:20:15 -04:00
Anna Schumaker 58a1df1d1d db: Test track timestamps using datetime.datetime.utcnow()
I haven't run tests in the evening in a long time, so I never noticed
these failures due to sqlite returning utc timestamps when we expect
localtime.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-01-29 14:38:24 -05:00
Anna Schumaker 3c25dc2c7f tracklist: Change the "shuffle disabled" icon based on icon theme
The Breeze "media-playlist-consecutive" icon looks terrible, and we want
to use "media-playlist-normal" to get the same look as with the Adwaita
icon theme. Unfortunately, Adwaita doesn't have the
"media-playlist-normal" icon. So I created a function to ask the icon
theme if it has a specific icon, and modify the shuffle button to change
the inactive icon when toggled to keep up with icon theme changes.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-01-29 14:38:24 -05:00
Anna Schumaker c37ae94a5d emmental: Icon Updates
These are various icon changes that I noticed after using emmental with
KDE & the Breeze icon theme for a while.

- Replace the go-jump icon with arrow4-down-symbolic
- Replace the view-list-ordered icon with list-compact

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-01-29 14:38:24 -05:00
Anna Schumaker e6a219017d sidebar: Use a symbolic icon for the library section
I was relying on the icon theme to fallback to symbolic icons when I
initially wrote it. Turns out, some icon themes do provide a color icon
for this, so I specifically ask for the symoblic icon for consistency.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-01-29 14:38:24 -05:00
Anna Schumaker 3c15515faf texture: Clear the existing texture cache before testing
This test started failing after updating to pytest 8.0. I fix it by
clearing the cache so the test can begin with a clear slate.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-01-29 14:38:24 -05:00
Anna Schumaker 6c6ebf3676 submodules: Update the git url of the mpris spec
Freedesktop.org uses gitlab now.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-01-20 10:47:55 -05:00
47 changed files with 1899 additions and 258 deletions

2
.gitmodules vendored
View File

@ -3,4 +3,4 @@
url = ssh://aur@aur.archlinux.org/emmental.git
[submodule "mpris-spec"]
path = mpris-spec
url = https://github.com/freedesktop/mpris-spec.git
url = https://gitlab.freedesktop.org/mpris/mpris-spec.git

View File

@ -20,6 +20,7 @@ other playlists run out of tracks.
* Python3
* dateutil
* gobject
* liblistenbrainz
* musicbrainzngs
* mutagen
* pyxdg

View File

@ -7,6 +7,7 @@ from . import action
from . import audio
from . import db
from . import header
from . import listenbrainz
from . import mpris2
from . import nowplaying
from . import options
@ -34,6 +35,7 @@ class Application(Adw.Application):
factory = GObject.Property(type=playlist.Factory)
mpris = GObject.Property(type=mpris2.Connection)
player = GObject.Property(type=audio.Player)
lbrainz = GObject.Property(type=listenbrainz.ListenBrainz)
win = GObject.Property(type=window.Window)
autopause = GObject.Property(type=int, default=-1, minimum=-1, maximum=99)
@ -135,11 +137,14 @@ class Application(Adw.Application):
hdr = header.Header(sql=self.db, title=VERSION_STRING)
for prop in ["bg-enabled", "bg-volume", "volume"]:
hdr.bind_property(prop, self.player, prop)
hdr.bind_property("listenbrainz-token", self.lbrainz, "user-token")
for (setting, property) in [("audio.volume", "volume"),
("audio.background.enabled", "bg-enabled"),
("audio.background.volume", "bg-volume"),
("audio.replaygain.enabled", "rg-enabled"),
("audio.replaygain.mode", "rg-mode")]:
("audio.replaygain.mode", "rg-mode"),
("listenbrainz.token",
"listenbrainz_token")]:
self.db.settings.bind_setting(setting, hdr, property)
self.__add_accelerators(hdr.accelerators)
@ -253,6 +258,15 @@ class Application(Adw.Application):
self.mpris.player.connect("SetPosition", self.__set_position)
self.mpris.player.connect("Stop", self.player.stop)
def connect_listenbrainz(self) -> None:
"""Connect the listenbrainz client."""
self.db.tracks.bind_property("current-track",
self.lbrainz, "now-playing")
self.lbrainz.bind_property("valid-token", self.win.header,
"listenbrainz-token-valid")
self.db.tracks.connect("track-played", self.lbrainz.submit_listens)
def connect_playlist_factory(self) -> None:
"""Connect the playlist factory properties."""
self.db.playlists.bind_property("previous",
@ -279,6 +293,7 @@ class Application(Adw.Application):
Adw.Application.do_startup(self)
self.db = db.Connection()
self.mpris = mpris2.Connection()
self.lbrainz = listenbrainz.ListenBrainz(self.db)
self.factory = playlist.Factory(self.db)
self.player = audio.Player()
@ -291,6 +306,7 @@ class Application(Adw.Application):
self.win = self.build_window()
self.add_window(self.win)
self.connect_mpris2()
self.connect_listenbrainz()
self.connect_playlist_factory()
self.connect_player()
@ -316,6 +332,9 @@ class Application(Adw.Application):
if self.win is not None:
self.win.close()
self.win = None
if self.lbrainz is not None:
self.lbrainz.stop()
self.lbrainz = None
if self.mpris is not None:
self.mpris.disconnect()
self.mpris = None

View File

@ -20,12 +20,14 @@ from . import years
SQL_V1_SCRIPT = pathlib.Path(__file__).parent / "emmental.sql"
SQL_V2_SCRIPT = pathlib.Path(__file__).parent / "upgrade-v2.sql"
SQL_V3_SCRIPT = pathlib.Path(__file__).parent / "upgrade-v3.sql"
class Connection(connection.Connection):
"""Connect to the database."""
active_playlist = GObject.Property(type=playlist.Playlist)
loaded = GObject.Property(type=bool, default=False)
def __init__(self):
"""Initialize a sqlite connection."""
@ -44,15 +46,25 @@ class Connection(connection.Connection):
self.tracks = tracks.Table(self)
def __check_loaded(self) -> None:
for tbl in list(self.playlist_tables()) + [self.tracks]:
if tbl.loaded is False:
return
self.loaded = True
def __check_version(self) -> None:
user_version = self("PRAGMA user_version").fetchone()["user_version"]
match user_version:
case 0:
self.executescript(SQL_V1_SCRIPT)
self.executescript(SQL_V2_SCRIPT)
self.executescript(SQL_V3_SCRIPT)
case 1:
self.executescript(SQL_V2_SCRIPT)
case 2: pass
self.executescript(SQL_V3_SCRIPT)
case 2:
self.executescript(SQL_V3_SCRIPT)
case 3: pass
case _:
raise Exception(f"Unsupported data version: {user_version}")
@ -99,3 +111,4 @@ class Connection(connection.Connection):
def table_loaded(self, tbl: table.Table) -> None:
"""Signal that a table has been loaded."""
tbl.loaded = True
self.__check_loaded()

View File

@ -55,10 +55,11 @@ class Library(playlist.Playlist):
def __tag_track(self, path: pathlib.Path) -> bool:
if self.tagger.ready.is_set():
(file, tags) = self.tagger.get_result(self.table.sql, self)
if file is None:
result = self.tagger.get_result(db=self.table.sql, library=self)
if result is None:
track = self.table.sql.tracks.lookup(self, path=path)
self.tagger.tag_file(path, track.mtime if track else None)
mtime = track.mtime if track else None
self.tagger.tag_file(path, mtime=mtime)
else:
return True
return False

View File

@ -3,9 +3,9 @@
import emmental.audio.tagger
import musicbrainzngs
import pathlib
import threading
from gi.repository import GObject
from .. import audio
from .. import thread
from . import albums
from . import artists
from . import connection
@ -178,24 +178,12 @@ class Tags:
return year if year else self.db.years.create(raw_year)
class Thread(threading.Thread):
class Thread(thread.Thread):
"""A thread for tagging files without blocking the UI."""
def __init__(self):
"""Initialize the Tagger Thread."""
super().__init__()
self.ready = threading.Event()
self._connection = None
self._condition = threading.Condition()
self._file = None
self._mtime = None
self._tags = None
self.start()
def __close_connection(self) -> None:
if self._connection:
self._connection.close()
self._connection = None
def __get_connection(self) -> connection.Connection:
@ -213,55 +201,31 @@ class Thread(threading.Thread):
mb_res = musicbrainzngs.get_artist_by_id(artist.mbid)
artist.name = mb_res["artist"]["name"]
def get_result(self, db: GObject.TYPE_PYOBJECT,
library: playlist.Playlist) \
-> tuple[pathlib.Path | None, Tags | None]:
def do_get_result(self, result: thread.Data, db: GObject.TYPE_PYOBJECT,
library: playlist.Playlist) -> tuple:
"""Return the resulting Tags structure."""
with self._condition:
if not self.ready.is_set():
return (None, None)
tags = None if result.tags is None else Tags(db, result.tags, library)
return (result.path, tags)
tags = Tags(db, self._tags, library) if self._tags else None
res = (self._file, tags)
self._file = None
self._tags = None
return res
def run(self) -> None:
"""Sleep until we have work to do."""
with self._condition:
self.ready.set()
while self._condition.wait():
if self._file is None:
break
tags = emmental.audio.tagger.tag_file(self._file, self._mtime)
if tags is not None:
for artist in tags.artists:
self.__check_artist(artist)
self._tags = tags
self.ready.set()
self.__close_connection()
def stop(self) -> None:
"""Stop the thread."""
with self._condition:
self._file = None
self._mtime = None
self._condition.notify()
self.join()
def tag_file(self, file: pathlib.Path, mtime: float | None) -> None:
def do_run_task(self, task: thread.Data) -> None:
"""Tag a file."""
with self._condition:
self.ready.clear()
self._file = file
self._mtime = mtime
self._tags = None
self._condition.notify()
tags = emmental.audio.tagger.tag_file(task.path, task.mtime)
if tags is not None:
for artist in tags.artists:
self.__check_artist(artist)
self.set_result(path=task.path, tags=tags)
def do_stop(self) -> None:
"""Close the connection before stopping."""
if self._connection:
self._connection.close()
self._connection = None
def tag_file(self, path: pathlib.Path,
*, mtime: float | None = None) -> None:
"""Tag a file."""
self.set_task(path=path, mtime=mtime)
def untag_track(db: GObject.TYPE_PYOBJECT, track: tracks.Track) -> None:

View File

@ -200,6 +200,12 @@ class Table(table.Table):
return self.sql(f"UPDATE tracks SET {column}=? WHERE trackid=?",
newval, track.trackid)
def delete_listens(self, listenids: list[int]) -> None:
"""Delete the listens indicated by the provided listenids."""
self.sql.executemany("""DELETE FROM listenbrainz_queue
WHERE listenid=?""",
*[(id,) for id in listenids])
def get_artists(self, track: Track) -> list[table.Row]:
"""Get the set of Artists for a specific Track."""
rows = self.sql("""SELECT artistid FROM artist_tracks_view
@ -212,6 +218,14 @@ class Table(table.Table):
WHERE trackid=?""", track.trackid).fetchall()
return [self.sql.genres.rows.get(row["genreid"]) for row in rows]
def get_n_listens(self, n: int) -> list[tuple]:
"""Get the n most recent listens from the listenbrainz queue."""
cur = self.sql("""SELECT listenid, trackid, timestamp
FROM listenbrainz_queue ORDER BY timestamp DESC
LIMIT ?""", n)
return [(row["listenid"], self.rows[row["trackid"]], row["timestamp"])
for row in cur.fetchall()]
def map_sort_order(self, ordering: str) -> dict[int, int]:
"""Get a lookup table for Track sort keys."""
ordering = ordering if len(ordering) > 0 else "trackid"
@ -270,9 +284,17 @@ class Table(table.Table):
self.sql.playlists.most_played.reload_tracks(idle=True)
self.sql.playlists.queued.remove_track(track)
self.sql.playlists.unplayed.remove_track(track)
self.emit("track-played", track)
self.sql.commit()
@GObject.Signal(arg_types=(Track,))
def track_played(self, track: Track) -> None:
"""Signal that a Track was played."""
if track is not None:
self.sql("""INSERT INTO listenbrainz_queue (trackid, timestamp)
VALUES (?, ?)""", track.trackid, track.lastplayed)
class TrackidSet(GObject.GObject):
"""Manage a set of Track IDs."""

View File

@ -0,0 +1,25 @@
/* Copyright 2024 (c) Anna Schumaker */
PRAGMA user_version = 3;
/*
* The `listenbrainz_queue` table is used to store recently played tracks
* before submitting them to ListenBrainz. This gives us some form of offline
* recovery, since anything in this table needs to be submitted the next time
* we can successfully connect. As a bonus, I prepopulate this table using
* the last played data from tracks that have already been played when this
* table is created.
*/
CREATE TABLE listenbrainz_queue (
listenid INTEGER PRIMARY KEY,
trackid INTEGER REFERENCES tracks (trackid)
ON DELETE CASCADE
ON UPDATE CASCADE,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO listenbrainz_queue (trackid, timestamp)
SELECT trackid, lastplayed FROM tracks
WHERE lastplayed IS NOT NULL;

View File

@ -43,6 +43,13 @@ def add_style():
CSS_PROVIDER, CSS_PRIORITY)
def has_icon(icon_name: str):
"""Check if the icon theme has a specific icon."""
display = gi.repository.Gdk.Display.get_default()
theme = gi.repository.Gtk.IconTheme.get_for_display(display)
return theme.has_icon(icon_name)
def __version_string(subsystem, major, minor, micro):
return f"{subsystem} {major}.{minor}.{micro}"

View File

@ -9,6 +9,7 @@ from ..action import ActionEntry
from .. import db
from .. import buttons
from .. import gsetup
from . import listenbrainz
from . import open
from . import replaygain
from . import volume
@ -34,6 +35,7 @@ class Header(Gtk.HeaderBar):
sql = GObject.Property(type=db.Connection)
title = GObject.Property(type=str)
subtitle = GObject.Property(type=str)
listenbrainz_token = GObject.Property(type=str)
show_sidebar = GObject.Property(type=bool, default=False)
bg_enabled = GObject.Property(type=bool, default=False)
bg_volume = GObject.Property(type=float, default=0.5)
@ -44,11 +46,27 @@ class Header(Gtk.HeaderBar):
def __init__(self, sql: db.Connection, title: str):
"""Initialize the HeaderBar."""
super().__init__(title=title, subtitle=SUBTITLE, sql=sql)
icon = "sidebar-show-symbolic"
self._show_sidebar = Gtk.ToggleButton(icon_name=icon, has_frame=False)
self._open = open.Button()
self._title = Adw.WindowTitle(title=self.title, subtitle=self.subtitle,
tooltip_text=gsetup.env_string())
icon = "sidebar-show-symbolic"
self._show_sidebar = Gtk.ToggleButton(icon_name=icon, has_frame=False)
self._open = open.OpenRow()
self._listenbrainz = listenbrainz.ListenBrainzRow()
self._menu_box = Gtk.ListBox(selection_mode=Gtk.SelectionMode.NONE)
self._menu_box.add_css_class("boxed-list")
self._menu_box.append(self._open)
self._menu_box.append(self._listenbrainz)
if __debug__:
self._settings = settings.Row(sql)
self._menu_box.append(self._settings)
icon = "open-menu-symbolic"
self._menu_button = buttons.PopoverButton(popover_child=self._menu_box,
icon_name=icon)
self._volume = volume.VolumeRow()
self._volume_icon = Gtk.Image(icon_name=_volume_icon(self.volume))
self._background = volume.BackgroundRow()
@ -59,18 +77,19 @@ class Header(Gtk.HeaderBar):
self._icons.append(self._volume_icon)
self._icons.append(self._background_icon)
self._box = Gtk.ListBox(selection_mode=Gtk.SelectionMode.NONE)
self._box.add_css_class("boxed-list")
self._box.append(self._volume)
self._box.append(self._background)
self._box.append(self._replaygain)
self._vol_box = Gtk.ListBox(selection_mode=Gtk.SelectionMode.NONE)
self._vol_box.add_css_class("boxed-list")
self._vol_box.append(self._volume)
self._vol_box.append(self._background)
self._vol_box.append(self._replaygain)
self._button = buttons.PopoverButton(popover_child=self._box,
child=self._icons,
has_frame=False, margin_end=6)
self._vol_button = buttons.PopoverButton(popover_child=self._vol_box,
child=self._icons,
has_frame=False, margin_end=6)
self.bind_property("title", self._title, "title")
self.bind_property("subtitle", self._title, "subtitle")
self.bind_property("listenbrainz-token", self._listenbrainz, "text")
self.bind_property("show-sidebar", self._show_sidebar, "active",
GObject.BindingFlags.BIDIRECTIONAL)
self.bind_property("bg-enabled", self._background, "enabled",
@ -85,18 +104,14 @@ class Header(Gtk.HeaderBar):
GObject.BindingFlags.BIDIRECTIONAL)
self.pack_start(self._show_sidebar)
self.pack_start(self._open)
if __debug__:
self._window = settings.Window(sql)
self._settings = Gtk.Button(icon_name="settings-symbolic",
tooltip_text="open settings editor")
self._settings.connect("clicked", self.__run_settings)
self.pack_start(self._settings)
self.pack_start(self._menu_button)
self.pack_end(self._button)
self.pack_end(self._vol_button)
self.set_title_widget(self._title)
self._menu_button.props.popover.connect("closed", self.__menu_closed)
self._open.connect("track-requested", self.__track_requested)
self._listenbrainz.connect("apply", self.__listenbrainz_apply)
self.connect("notify", self.__notify)
def __run_settings(self, button: Gtk.Button) -> None:
@ -119,12 +134,35 @@ class Header(Gtk.HeaderBar):
status = (f"volume: {round(self.volume * 100)}%\n"
f"background listening: {bg_status}\n"
f"normalizing: {rg_status}")
self._button.set_tooltip_text(status)
self._vol_button.set_tooltip_text(status)
def __track_requested(self, button: open.Button,
def __listenbrainz_apply(self, entry: Adw.PasswordEntryRow) -> None:
self.listenbrainz_token = entry.get_text()
self._menu_button.popdown()
def __menu_closed(self, popover: Gtk.Popover) -> None:
self._listenbrainz.props.text = self.listenbrainz_token
def __track_requested(self, button: open.OpenRow,
path: pathlib.Path) -> None:
self.emit("track-requested", path)
@GObject.Property(type=bool, default=True)
def listenbrainz_token_valid(self) -> bool:
"""Check if we think the listenbrainz token is valid."""
return not self._listenbrainz.has_css_class("warning")
@listenbrainz_token_valid.setter
def listenbrainz_token_valid(self, valid: bool) -> None:
if valid:
self._menu_button.remove_css_class("warning")
self._listenbrainz.remove_css_class("warning")
else:
win = self.get_ancestor(Gtk.Window)
win.post_toast("listenbrainz: user token is invalid")
self._menu_button.add_css_class("warning")
self._listenbrainz.add_css_class("warning")
@property
def accelerators(self) -> list[ActionEntry]:
"""Get a list of accelerators for the Header."""

View File

@ -0,0 +1,14 @@
# Copyright 2024 (c) Anna Schumaker.
"""A custom Adw.PasswordEntryRow to set the user token."""
from gi.repository import Gtk
from gi.repository import Adw
def ListenBrainzRow() -> Adw.PasswordEntryRow:
"""Create a new PasswordEntryRow for entering the user token."""
row = Adw.PasswordEntryRow(title="ListenBrainz User Token",
show_apply_button=True)
row.prefix = Gtk.Image(icon_name="listenbrainz-logo-symbolic")
row.add_prefix(row.prefix)
return row

View File

@ -1,19 +1,21 @@
# Copyright 2023 (c) Anna Schumaker.
"""A custom Button that opens a FileDialog to select a file for playback."""
"""A custom Adw.ActionRow to select a file for playback."""
import pathlib
from gi.repository import GObject
from gi.repository import GLib
from gi.repository import Gio
from gi.repository import Gtk
from gi.repository import Adw
class Button(Gtk.Button):
"""Our pre-configured open button."""
class OpenRow(Adw.ActionRow):
"""Our pre-configured open Adw.ActionRow."""
def __init__(self):
"""Initialize our open button."""
super().__init__(icon_name="document-open-symbolic",
tooltip_text="open a file for playback")
"""Initialize our open ActionRow."""
super().__init__(activatable=True, title="Open File",
subtitle="Select a file for playback")
self._prefix = Gtk.Image(icon_name="document-open-symbolic")
self._filters = Gio.ListStore()
self._filter = Gtk.FileFilter(name="Audio Files",
mime_types=["inode/directory",
@ -23,6 +25,9 @@ class Button(Gtk.Button):
self._filters.append(self._filter)
self.connect("activated", self.__on_activated)
self.add_prefix(self._prefix)
def __async_ready(self, dialog: Gtk.FileDialog, task: Gio.Task) -> None:
try:
file = dialog.open_finish(task)
@ -30,8 +35,9 @@ class Button(Gtk.Button):
except GLib.Error:
pass
def do_clicked(self) -> None:
"""Handle a click event."""
def __on_activated(self, row: Adw.ActionRow) -> None:
"""Handle activating an OpenRow."""
self.get_ancestor(Gtk.Popover).popdown()
self._dialog.open(self.get_ancestor(Gtk.Window), None,
self.__async_ready)

View File

@ -64,3 +64,21 @@ class Window(Adw.Window):
def __filter(self, entry: entry.Filter) -> None:
self._selection.get_model().filter(entry.get_query())
class Row(Adw.ActionRow):
"""An Adw.ActionRow for opening the Settings Window."""
def __init__(self, sql: db.Connection):
"""Initialize our settings ActionRow."""
super().__init__(activatable=True, title="Edit Settings",
subtitle="Open the settings editor (debug only)")
self._prefix = Gtk.Image(icon_name="settings-symbolic")
self._window = Window(sql)
self.connect("activated", self.__on_activated)
self.add_prefix(self._prefix)
def __on_activated(self, row: Adw.ActionRow) -> None:
self.get_ancestor(Gtk.Popover).popdown()
self._window.present()

View File

@ -0,0 +1,114 @@
# Copyright 2024 (c) Anna Schumaker.
"""Our ListenBrainz custom GObject."""
from gi.repository import GObject
from gi.repository import GLib
from .. import db
from . import listen
from . import thread
from . import task
class ListenBrainz(GObject.GObject):
"""Our main ListenBrainz GObject."""
sql = GObject.Property(type=db.Connection)
offline = GObject.Property(type=bool, default=True)
user_token = GObject.Property(type=str)
valid_token = GObject.Property(type=bool, default=True)
now_playing = GObject.Property(type=db.tracks.Track)
def __init__(self, sql: db.Connection):
"""Initialize the ListenBrainz GObject."""
super().__init__(sql=sql)
self._queue = task.Queue()
self._thread = thread.Thread()
self._idle_id = None
self._timeout_id = None
self.connect("notify::offline", self.__notify_offline)
self.connect("notify::user-token", self.__notify_user_token)
self.connect("notify::now-playing", self.__notify_now_playing)
def __check_connected(self) -> bool:
return len(self.user_token) and self.valid_token and not self.offline
def __check_online(self) -> None:
self.notify("user-token")
def __check_result(self) -> None:
if (res := self._thread.get_result()) is not None:
self.valid_token = res.valid
self.offline = res.offline
if res.op == "submit-listens" and self.valid_token \
and not self.offline:
listens = [lsn.listenid for lsn in res.listens]
self.sql.tracks.delete_listens(listens)
def __parse_task(self, op: str, *args) -> bool:
match op:
case "clear-token":
self._thread.clear_user_token()
case "now-playing":
self._thread.submit_now_playing(listen.Listen(*args))
case "set-token":
self._thread.set_user_token(*args)
case "submit-listens":
listens = self.sql.tracks.get_n_listens(50)
if len(listens) == 0:
self._idle_id = None
return GLib.SOURCE_REMOVE
self._thread.submit_listens([listen.Listen(trk, listenid=id,
listened_at=ts)
for (id, trk, ts) in listens])
return GLib.SOURCE_CONTINUE
def __idle_work(self) -> bool:
if self.sql.loaded and self._thread.ready.is_set():
self.__check_result()
return self.__parse_task(*self._queue.pop())
return GLib.SOURCE_CONTINUE
def __idle_start(self) -> None:
if self._idle_id is None:
self._idle_id = GLib.idle_add(self.__idle_work)
def __notify_offline(self, listenbrainz: GObject.GObject,
param: GObject.ParamSpec) -> None:
if self.offline and self._timeout_id is None:
self._timeout_id = GLib.timeout_add_seconds(300,
self.__check_online)
elif not self.offline and self._timeout_id is not None:
self.__source_stop("_timeout_id")
def __notify_user_token(self, listenbrainz: GObject.GObject,
param: GObject.ParamSpec) -> None:
match self.user_token:
case "": self._queue.push("clear-token")
case _: self._queue.push("set-token", self.user_token)
self.__idle_start()
def __notify_now_playing(self, listenbrainz: GObject.GObject,
param: GObject.ParamSpec) -> None:
if self.now_playing is not None:
self._queue.push("now-playing", self.now_playing)
if self.__check_connected():
self.__idle_start()
else:
self._queue.clear("now-playing")
def __source_stop(self, srcid: str) -> None:
if (id := getattr(self, srcid)) is not None:
GLib.source_remove(id)
setattr(self, srcid, None)
def stop(self) -> None:
"""Stop the ListenBrainz thread."""
self.__source_stop("_idle_id")
self.__source_stop("_timeout_id")
self._thread.stop()
def submit_listens(self, *args) -> None:
"""Submit recent listens to ListenBrainz."""
if self.__check_connected():
self.__idle_start()

View File

@ -0,0 +1,28 @@
# Copyright 2024 (c) Anna Schumaker.
"""Convert a db.track.Track to a liblistenbrainz.Listen."""
import datetime
import dateutil.tz
import liblistenbrainz
from .. import db
from .. import gsetup
class Listen(liblistenbrainz.Listen):
"""A single ListenBrainz Listen."""
def __init__(self, track: db.tracks.Track, *, listenid: int = None,
listened_at: datetime.datetime = None):
"""Initialize our Listen class."""
album = track.get_medium().get_album()
artists = [a.mbid for a in track.get_artists() if len(a.mbid) > 0]
album_mbid = album.mbid if len(album.mbid) > 0 else None
super().__init__(track.title, track.artist, release_name=album.name,
artist_mbids=artists, release_group_mbid=album_mbid,
tracknumber=track.number,
additional_info={"media_player":
f"emmental{gsetup.DEBUG_STR}"})
self.listenid = listenid
if listened_at is not None:
when = listened_at.replace(tzinfo=dateutil.tz.tzutc())
self.listened_at = when.astimezone().timestamp()

View File

@ -0,0 +1,31 @@
# Copyright 2024 (c) Anna Schumaker.
"""Our ListenBrainz operation priority queue."""
class Queue:
"""A queue for prioritizing ListenBrainz operations."""
def __init__(self):
"""Initialize the task Queue."""
self._set_token = None
self._now_playing = None
def clear(self, op: str) -> None:
"""Clear a pending operation."""
match op:
case "clear-token" | "set-token": self._set_token = None
case "now-playing": self._now_playing = None
def push(self, op: str, *args) -> None:
"""Push an operation onto the queue."""
match op:
case "clear-token" | "set-token": self._set_token = (op, *args)
case "now-playing": self._now_playing = (op, *args)
def pop(self) -> tuple:
"""Pop an operation off the queue."""
if (res := self._set_token) is not None:
self._set_token = None
elif (res := self._now_playing) is not None:
self._now_playing = None
return res if res is not None else ("submit-listens",)

View File

@ -0,0 +1,96 @@
# Copyright 2024 (c) Anna Schumaker.
"""Our ListenBrainz client thread."""
import liblistenbrainz
import requests
from .. import thread
class Thread(thread.Thread):
"""Thread for submitting listens to ListenBrainz."""
def __init__(self):
"""Initialize the ListenBrainz Thread object."""
super().__init__()
self._client = liblistenbrainz.client.ListenBrainz()
def __print(self, text: str) -> None:
print(f"listenbrainz: {text}")
def __set_user_token(self, token: str) -> None:
try:
self._client.set_auth_token(token)
self.set_result("set-token", token=token)
except liblistenbrainz.errors.InvalidAuthTokenException:
self.set_result("set-token", token=token, valid=False)
except requests.exceptions.ConnectionError:
self.set_result("set-token", token=token, offline=True)
def __submit_now_playing(self, listen: liblistenbrainz.Listen) -> None:
try:
self._client.submit_playing_now(listen)
self.set_result("now-playing")
except liblistenbrainz.errors.ListenBrainzAPIException:
self.set_result("now-playing", valid=False)
except requests.exceptions.ConnectionError:
self.set_result("now-playing", offline=True)
def __submit_listens(self, listens: list[liblistenbrainz.Listen]) -> None:
try:
if len(listens) == 1:
self._client.submit_single_listen(listens[0])
else:
self._client.submit_multiple_listens(listens)
self.set_result("submit-listens", listens=listens)
except liblistenbrainz.errors.ListenBrainzAPIException:
self.set_result("submit-listens", listens=listens, valid=False)
except requests.exceptions.ConnectionError:
self.set_result("submit-listens", listens=listens, offline=True)
def do_run_task(self, task: thread.Data) -> None:
"""Call a specific listenbrainz operation."""
match task.op:
case "clear-token":
self._client.set_auth_token(None, check_validity=False)
self.set_result("clear-token")
case "now-playing":
self.__submit_now_playing(task.listen)
case "set-token":
self.__set_user_token(task.token)
case "submit-listens":
self.__submit_listens(task.listens)
def clear_user_token(self) -> None:
"""Schedule clearing the user token."""
self.__print("clearing user token")
self.set_task(op="clear-token")
def get_result(self, **kwargs) -> thread.Data:
"""Get the result of a listenbrainz task."""
if (res := super().get_result(**kwargs)) is not None:
if not res.valid:
self.__print("user token is invalid")
if res.offline:
self.__print("offline")
return res
def set_result(self, op: str, *, valid: bool = True,
offline: bool = False, **kwargs) -> None:
"""Set the Thread result with a standard format for all ops."""
super().set_result(op=op, valid=valid, offline=offline, **kwargs)
def set_user_token(self, token: str) -> None:
"""Schedule setting the user token."""
self.__print("setting user token")
self.set_task(op="set-token", token=token)
def submit_now_playing(self, listen: liblistenbrainz.Listen) -> None:
"""Schedule setting the now-playing track."""
self.__print(f"now playing '{listen.track_name}' " +
f"by '{listen.artist_name}'")
self.set_task(op="now-playing", listen=listen)
def submit_listens(self, listens: list[liblistenbrainz.Listen]) -> None:
"""Submit listens to listenbrainz."""
num = len(listens)
self.__print(f"submitting {num} listen{'s' if num != 1 else ''}")
self.set_task(op="submit-listens", listens=listens)

View File

@ -46,10 +46,10 @@ class Card(Gtk.Box):
large_icon=True,
has_frame=False, sensitive=False,
valign=Gtk.Align.CENTER)
self._jump = buttons.Button(icon_name="go-jump", has_frame=False,
self._jump = buttons.Button(icon_name="arrow4-down-symbolic",
tooltip_text="scroll to current track",
large_icon=True, sensitive=False,
valign=Gtk.Align.CENTER)
has_frame=False, valign=Gtk.Align.CENTER)
self._seeker = seeker.Scale(sensitive=False)
self.bind_property("artwork", self._artwork, "filepath")

View File

@ -27,7 +27,7 @@ class Card(Gtk.Box):
sensitive=False, **kwargs)
self._header = Gtk.CenterBox()
self._filter = entry.Filter("playlists", hexpand=True)
self._jump = Gtk.Button(icon_name="go-jump-symbolic",
self._jump = Gtk.Button(icon_name="arrow4-down-symbolic",
tooltip_text="scroll to current playlist")
self._playlists = playlist.Section(self.sql.playlists)
self._artists = artist.Section(self.sql.artists, self.sql.albums)
@ -52,7 +52,7 @@ class Card(Gtk.Box):
self._filter.connect("search-changed", self.__search_changed)
self._jump.connect("clicked", self.__jump_to_playlist)
self.sql.connect("table-loaded", self.__table_loaded)
self.sql.connect("notify::loaded", self.__database_loaded)
self._header.add_css_class("toolbar")
self.add_css_class("card")
@ -63,13 +63,12 @@ class Card(Gtk.Box):
def __search_changed(self, entry: entry.Filter) -> None:
self.sql.filter(entry.get_query())
def __table_loaded(self, sql: db.Connection, table: db.table.Table):
if self.get_sensitive() is False:
if False not in {tbl.loaded for tbl in sql.playlist_tables()}:
self.set_sensitive(True)
self.select_playlist(sql.active_playlist, 150)
if len(sql.libraries) == 0:
self._libraries.extra_widget.emit("clicked")
def __database_loaded(self, sql: db.Connection, param: GObject.ParamSpec):
self.set_sensitive(sql.loaded)
if sql.loaded is True:
self.select_playlist(sql.active_playlist, 150)
if len(sql.libraries) == 0:
self._libraries.extra_widget.emit("clicked")
def __select_playlist(self, playlist: db.playlist.Playlist) -> bool:
if playlist is not None:

View File

@ -39,7 +39,7 @@ class Section(section.Section):
def __init__(self, table=db.libraries.Table):
"""Initialize our library path section."""
super().__init__(table, LibraryRow, icon_name="library-music",
super().__init__(table, LibraryRow, icon_name="library-music-symbolic",
title="Library Paths", subtitle="0 library paths")
self.extra_widget = Gtk.Button(icon_name="folder-new", has_frame=False,
tooltip_text="add new library path")

94
emmental/thread.py Normal file
View File

@ -0,0 +1,94 @@
# Copyright 2024 (c) Anna Schumaker.
"""A Thread class designed to easily sync up with the main thread."""
import threading
class Data:
"""A class for holding generic fields inspired by SimpleNamespace."""
def __init__(self, values_dict: dict = {}, **kwargs):
"""Initialize our Data class."""
self.__dict__.update(values_dict | kwargs)
def __eq__(self, rhs: any) -> bool:
"""Compare two Data classes."""
if isinstance(rhs, Data):
return self.__dict__ == rhs.__dict__
elif isinstance(rhs, dict):
return self.__dict__ == rhs
return False
def __repr__(self) -> str:
"""Get a string representation of the Data."""
items = (f"{k}={v!r}" for k, v in self.__dict__.items())
return f"{type(self).__name__}({', '.join(items)})"
class Thread(threading.Thread):
"""A worker Thread class that is easy to sync up with the main thread."""
def __init__(self):
"""Initialize our worker Thread object."""
super().__init__()
self.ready = threading.Event()
self._condition = threading.Condition()
self._task = None
self._result = None
self.start()
def do_get_result(self, result: Data, **kwargs) -> Data:
"""Get the result of the task."""
return self._result
def do_run_task(self, task: Data) -> None:
"""Run the task."""
self.set_result()
def do_stop(self) -> None:
"""Extra work when stopping the thread."""
def get_result(self, **kwargs) -> Data:
"""Get the result of the current task."""
with self._condition:
if not self.ready.is_set() or self._result is None:
return None
res = self.do_get_result(self._result, **kwargs)
self._result = None
return res
def run(self) -> None:
"""Wait for a task to run."""
with self._condition:
self.ready.set()
while self._condition.wait():
if self._task is None:
self.do_stop()
break
self.do_run_task(self._task)
def set_result(self, **kwargs: dict) -> None:
"""Set the result of the task."""
self._result = Data(kwargs)
self.ready.set()
def __set_task(self, task: Data | None) -> None:
"""Set the task to be run by the thread."""
with self._condition:
self.ready.clear()
self._task = task
self._result = None
self._condition.notify()
def set_task(self, **kwargs: dict) -> None:
"""Set the task to be run by the thread."""
self.__set_task(Data(kwargs))
def stop(self) -> None:
"""Stop the thread."""
self.__set_task(None)
self.join()

View File

@ -5,6 +5,7 @@ from gi.repository import Gio
from gi.repository import Gtk
from . import sorter
from .. import buttons
from .. import gsetup
class VisibleRow(Gtk.ListBoxRow):
@ -107,7 +108,7 @@ class ShuffleButton(buttons.ImageToggle):
"""Initialize a Shuffle Button."""
super().__init__(active_icon_name="media-playlist-shuffle",
active_tooltip_text="shuffle: enabled",
inactive_icon_name="media-playlist-consecutive",
inactive_icon_name=self.get_inactive_icon(),
inactive_tooltip_text="shuffle: disabled",
large_icon=False, icon_opacity=0.5,
has_frame=False, **kwargs)
@ -115,6 +116,13 @@ class ShuffleButton(buttons.ImageToggle):
def do_toggled(self):
"""Adjust opacity when active state toggles."""
self.icon_opacity = 1.0 if self.active else 0.5
self.inactive_icon_name = self.get_inactive_icon()
def get_inactive_icon(self) -> str:
"""Return the inactive icon name."""
if gsetup.has_icon("media-playlist-normal"):
return "media-playlist-normal"
return "media-playlist-consecutive"
class SortRow(Gtk.ListBoxRow):
@ -180,7 +188,7 @@ class SortButton(buttons.PopoverButton):
"""Initialize the Sort button."""
super().__init__(has_frame=False, model=sorter.SortOrderModel(),
tooltip_text="configure playlist sort order",
icon_name="view-list-ordered-symbolic", **kwargs)
icon_name="list-compact-symbolic", **kwargs)
self.popover_child = Gtk.ListBox(selection_mode=Gtk.SelectionMode.NONE)
self.popover_child.bind_model(self.model, self.__create_func)
self.popover_child.connect("row-activated", self.__row_activated)

View File

@ -0,0 +1,2 @@
<?xml version="1.0" encoding="UTF-8"?>
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" height="16px" viewBox="0 0 16 16" width="16px"><filter id="a" height="100%" width="100%" x="0%" y="0%"><feColorMatrix color-interpolation-filters="sRGB" values="0 0 0 0 1 0 0 0 0 1 0 0 0 0 1 0 0 0 1 0"/></filter><mask id="b"><g filter="url(#a)"><path d="m -1.6 -1.6 h 19.2 v 19.2 h -19.2 z" fill-opacity="0.5"/></g></mask><clipPath id="c"><path d="m 0 0 h 1600 v 1200 h -1600 z"/></clipPath><mask id="d"><g filter="url(#a)"><path d="m -1.6 -1.6 h 19.2 v 19.2 h -19.2 z" fill-opacity="0.7"/></g></mask><clipPath id="e"><path d="m 0 0 h 1600 v 1200 h -1600 z"/></clipPath><mask id="f"><g filter="url(#a)"><path d="m -1.6 -1.6 h 19.2 v 19.2 h -19.2 z" fill-opacity="0.35"/></g></mask><clipPath id="g"><path d="m 0 0 h 1600 v 1200 h -1600 z"/></clipPath><g mask="url(#b)"><g clip-path="url(#c)" transform="matrix(1 0 0 1 -920 -120)"><path d="m 550 182 c -0.351562 0.003906 -0.695312 0.101562 -1 0.28125 v 3.4375 c 0.304688 0.179688 0.648438 0.277344 1 0.28125 c 1.105469 0 2 -0.894531 2 -2 s -0.894531 -2 -2 -2 z m 0 5 c -0.339844 0 -0.679688 0.058594 -1 0.175781 v 6.824219 h 4 v -4 c 0 -1.65625 -1.34375 -3 -3 -3 z m 0 0"/></g></g><g mask="url(#d)"><g clip-path="url(#e)" transform="matrix(1 0 0 1 -920 -120)"><path d="m 569 182 v 4 c 1.105469 0 2 -0.894531 2 -2 s -0.894531 -2 -2 -2 z m 0 5 v 7 h 3 v -4 c 0 -1.65625 -1.34375 -3 -3 -3 z m 0 0"/></g></g><g mask="url(#f)"><g clip-path="url(#g)" transform="matrix(1 0 0 1 -920 -120)"><path d="m 573 182.269531 v 3.449219 c 0.613281 -0.355469 0.996094 -1.007812 1 -1.71875 c 0 -0.714844 -0.382812 -1.375 -1 -1.730469 z m 0 4.90625 v 6.824219 h 2 v -4 c 0 -1.269531 -0.800781 -2.402344 -2 -2.824219 z m 0 0"/></g></g><path d="m 7.984375 1 c -0.550781 0 -1 0.449219 -1 1 v 8.585938 l -2.292969 -2.292969 c -0.1875 -0.1875 -0.441406 -0.292969 -0.707031 -0.292969 s -0.519531 0.105469 -0.707031 0.292969 c -0.390625 0.390625 -0.390625 1.023437 0 1.414062 l 4 4 c 0.390625 0.390625 1.023437 0.390625 1.414062 0 l 4 -4 c 0.390625 -0.390625 0.390625 -1.023437 0 -1.414062 s -1.023437 -0.390625 -1.414062 0 l -2.292969 2.292969 v -8.585938 c 0 -0.550781 -0.445313 -1 -1 -1 z m 0 0"/></svg>

After

Width:  |  Height:  |  Size: 2.2 KiB

View File

@ -0,0 +1,2 @@
<?xml version="1.0" encoding="UTF-8"?>
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" height="16px" viewBox="0 0 16 16" width="16px"><filter id="a" height="100%" width="100%" x="0%" y="0%"><feColorMatrix color-interpolation-filters="sRGB" values="0 0 0 0 1 0 0 0 0 1 0 0 0 0 1 0 0 0 1 0"/></filter><mask id="b"><g filter="url(#a)"><path d="m -1.6 -1.6 h 19.2 v 19.2 h -19.2 z" fill-opacity="0.5"/></g></mask><clipPath id="c"><path d="m 0 0 h 1600 v 1200 h -1600 z"/></clipPath><mask id="d"><g filter="url(#a)"><path d="m -1.6 -1.6 h 19.2 v 19.2 h -19.2 z" fill-opacity="0.7"/></g></mask><clipPath id="e"><path d="m 0 0 h 1600 v 1200 h -1600 z"/></clipPath><mask id="f"><g filter="url(#a)"><path d="m -1.6 -1.6 h 19.2 v 19.2 h -19.2 z" fill-opacity="0.35"/></g></mask><clipPath id="g"><path d="m 0 0 h 1600 v 1200 h -1600 z"/></clipPath><path d="m 2.386719 3 c -0.214844 0 -0.386719 0.167969 -0.386719 0.378906 v 1.242188 c 0 0.210937 0.171875 0.378906 0.386719 0.378906 h 1.230469 c 0.210937 0 0.382812 -0.167969 0.382812 -0.378906 v -1.242188 c 0 -0.210937 -0.171875 -0.378906 -0.382812 -0.378906 z m 3.613281 0 v 2 h 8 v -2 z m -3.613281 4 c -0.214844 0 -0.386719 0.167969 -0.386719 0.378906 v 1.242188 c 0 0.210937 0.171875 0.378906 0.386719 0.378906 h 1.230469 c 0.210937 0 0.382812 -0.167969 0.382812 -0.378906 v -1.242188 c 0 -0.210937 -0.171875 -0.378906 -0.382812 -0.378906 z m 3.613281 0 v 2 h 8 v -2 z m -3.613281 4 c -0.214844 0 -0.386719 0.167969 -0.386719 0.378906 v 1.242188 c 0 0.210937 0.171875 0.378906 0.386719 0.378906 h 1.230469 c 0.210937 0 0.382812 -0.167969 0.382812 -0.378906 v -1.242188 c 0 -0.210937 -0.171875 -0.378906 -0.382812 -0.378906 z m 3.613281 0 v 2 h 8 v -2 z m 0 0" fill="#222222"/><g mask="url(#b)"><g clip-path="url(#c)" transform="matrix(1 0 0 1 -660 -864)"><path d="m 550 182 c -0.351562 0.003906 -0.695312 0.101562 -1 0.28125 v 3.4375 c 0.304688 0.179688 0.648438 0.277344 1 0.28125 c 1.105469 0 2 -0.894531 2 -2 s -0.894531 -2 -2 -2 z m 0 5 c -0.339844 0 -0.679688 0.058594 -1 0.175781 v 6.824219 h 4 v -4 c 0 -1.65625 -1.34375 -3 -3 -3 z m 0 0"/></g></g><g mask="url(#d)"><g clip-path="url(#e)" transform="matrix(1 0 0 1 -660 -864)"><path d="m 569 182 v 4 c 1.105469 0 2 -0.894531 2 -2 s -0.894531 -2 -2 -2 z m 0 5 v 7 h 3 v -4 c 0 -1.65625 -1.34375 -3 -3 -3 z m 0 0"/></g></g><g mask="url(#f)"><g clip-path="url(#g)" transform="matrix(1 0 0 1 -660 -864)"><path d="m 573 182.269531 v 3.449219 c 0.613281 -0.355469 0.996094 -1.007812 1 -1.71875 c 0 -0.714844 -0.382812 -1.375 -1 -1.730469 z m 0 4.90625 v 6.824219 h 2 v -4 c 0 -1.269531 -0.800781 -2.402344 -2 -2.824219 z m 0 0"/></g></g></svg>

After

Width:  |  Height:  |  Size: 2.6 KiB

View File

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<svg
id="a"
viewBox="0 0 16 16"
version="1.1"
sodipodi:docname="listenbrainz-logo-symbolic.svg"
width="16"
height="16"
inkscape:version="1.3.2 (091e20ef0f, 2023-11-25, custom)"
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
xmlns="http://www.w3.org/2000/svg"
xmlns:svg="http://www.w3.org/2000/svg">
<sodipodi:namedview
id="namedview2"
pagecolor="#505050"
bordercolor="#eeeeee"
borderopacity="1"
inkscape:showpageshadow="0"
inkscape:pageopacity="0"
inkscape:pagecheckerboard="0"
inkscape:deskcolor="#d1d1d1"
inkscape:zoom="52.917468"
inkscape:cx="4.2330068"
inkscape:cy="7.9652337"
inkscape:window-width="1920"
inkscape:window-height="1016"
inkscape:window-x="0"
inkscape:window-y="0"
inkscape:window-maximized="1"
inkscape:current-layer="a" />
<defs
id="defs1">
<style
id="style1">.b{fill:#353070;}.c{fill:#eb743b;}</style>
</defs>
<polygon
class="b"
points="13,29 13,1 1,8 1,22 "
id="polygon1"
transform="matrix(0.5,0,0,0.5,1,0.5)"
style="fill:#222222;fill-opacity:1" />
<polygon
class="c"
points="14,29 14,1 26,8 26,22 "
id="polygon2"
transform="matrix(0.399792,0,0,0.42127119,3.5057644,1.6847072)"
style="fill:none;stroke:#222222;stroke-width:3.01583;stroke-dasharray:none;stroke-opacity:1" />
</svg>

After

Width:  |  Height:  |  Size: 1.5 KiB

View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<svg
id="a"
viewBox="0 0 16 16"
version="1.1"
sodipodi:docname="listenbrainz-logo.svg"
width="16"
height="16"
inkscape:version="1.3.2 (091e20ef0f, 2023-11-25, custom)"
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
xmlns="http://www.w3.org/2000/svg"
xmlns:svg="http://www.w3.org/2000/svg">
<sodipodi:namedview
id="namedview2"
pagecolor="#505050"
bordercolor="#eeeeee"
borderopacity="1"
inkscape:showpageshadow="0"
inkscape:pageopacity="0"
inkscape:pagecheckerboard="0"
inkscape:deskcolor="#d1d1d1"
inkscape:zoom="45.119402"
inkscape:cx="3.9007609"
inkscape:cy="8.3445255"
inkscape:window-width="1920"
inkscape:window-height="1016"
inkscape:window-x="0"
inkscape:window-y="0"
inkscape:window-maximized="1"
inkscape:current-layer="a" />
<defs
id="defs1">
<style
id="style1">.b{fill:#353070;}.c{fill:#eb743b;}</style>
</defs>
<polygon
class="b"
points="1,22 13,29 13,1 1,8 "
id="polygon1"
transform="matrix(0.5,0,0,0.5,1.25,0.5)" />
<polygon
class="c"
points="26,8 26,22 14,29 14,1 "
id="polygon2"
transform="matrix(0.5,0,0,0.5,1.25,0.5)" />
</svg>

After

Width:  |  Height:  |  Size: 1.3 KiB

View File

@ -14,6 +14,7 @@ class TestConnection(tests.util.TestCase):
dir = pathlib.Path(emmental.db.__file__).parent
self.assertEqual(emmental.db.SQL_V1_SCRIPT, dir / "emmental.sql")
self.assertEqual(emmental.db.SQL_V2_SCRIPT, dir / "upgrade-v2.sql")
self.assertEqual(emmental.db.SQL_V3_SCRIPT, dir / "upgrade-v3.sql")
def test_connection(self):
"""Check that the connection manager is initialized properly."""
@ -22,16 +23,16 @@ class TestConnection(tests.util.TestCase):
def test_version(self):
"""Test checking the database schema version."""
cur = self.sql("PRAGMA user_version")
self.assertEqual(cur.fetchone()["user_version"], 2)
self.assertEqual(cur.fetchone()["user_version"], 3)
def test_version_too_new(self):
"""Test failing when the database version is too new."""
self.sql._Connection__check_version()
self.sql("PRAGMA user_version = 3")
self.sql("PRAGMA user_version = 4")
with self.assertRaises(Exception) as e:
self.sql._Connection__check_version()
self.assertEqual(str(e.exception), "Unsupported data version: 3")
self.assertEqual(str(e.exception), "Unsupported data version: 4")
def test_close(self):
"""Check closing the connection."""
@ -72,22 +73,34 @@ class TestConnection(tests.util.TestCase):
def test_load(self):
"""Check that calling load() loads the tables."""
idle_tables = [tbl for tbl in self.sql.playlist_tables()] + \
[self.sql.tracks]
plist_tables = list(self.sql.playlist_tables())
all_tables = [self.sql.settings] + plist_tables + [self.sql.tracks]
table_loaded = unittest.mock.Mock()
self.sql.connect("table-loaded", table_loaded)
self.assertFalse(self.sql.loaded)
notify_loaded = unittest.mock.Mock()
self.sql.connect("notify::loaded", notify_loaded)
self.sql.load()
self.assertTrue(self.sql.settings.loaded)
for tbl in idle_tables:
notify_loaded.assert_not_called()
for tbl in all_tables[1:]:
self.assertFalse(tbl.loaded)
for tbl in idle_tables:
for tbl in plist_tables:
tbl.queue.complete()
self.assertTrue(tbl.loaded)
self.assertFalse(self.sql.loaded)
notify_loaded.assert_not_called()
calls = [unittest.mock.call(self.sql, tbl)
for tbl in [self.sql.settings] + idle_tables]
self.sql.tracks.queue.complete()
self.assertTrue(self.sql.tracks.loaded)
self.assertTrue(self.sql.loaded)
notify_loaded.assert_called()
calls = [unittest.mock.call(self.sql, tbl) for tbl in all_tables]
table_loaded.assert_has_calls(calls)
def test_filter(self):

View File

@ -182,23 +182,23 @@ class TestLibraryObject(tests.util.TestCase):
tagger.tag_file.assert_not_called()
tagger.ready.is_set.return_value = True
tagger.get_result.return_value = (None, None)
tagger.get_result.return_value = None
self.assertFalse(self.library._Library__tag_track(track))
tagger.get_result.assert_called_with(self.sql, self.library)
tagger.tag_file.assert_called_with(track, None)
tagger.get_result.assert_called_with(db=self.sql, library=self.library)
tagger.tag_file.assert_called_with(track, mtime=None)
self.sql.tracks.lookup = unittest.mock.Mock()
self.sql.tracks.lookup.return_value.mtime = 12345
self.assertFalse(self.library._Library__tag_track(track))
tagger.get_result.assert_called_with(self.sql, self.library)
tagger.tag_file.assert_called_with(track, 12345)
tagger.get_result.assert_called_with(db=self.sql, library=self.library)
tagger.tag_file.assert_called_with(track, mtime=12345)
tagger.reset_mock()
tagger.ready.is_set.return_value = True
tagger.get_result.return_value = (track, tags)
tagger.get_result.return_value = {"path": track, "tags": tags}
self.assertTrue(self.library._Library__tag_track(track))
tagger.tag_file.assert_not_called()
tagger.get_result.assert_called_with(self.sql, self.library)
tagger.get_result.assert_called_with(db=self.sql, library=self.library)
@unittest.mock.patch("emmental.db.tagger.untag_track")
def test_scan_check_trackid(self, mock_untag: unittest.mock.Mock()):

View File

@ -1,9 +1,9 @@
# Copyright 2022 (c) Anna Schumaker
"""Tests our Mutagen wrapper."""
import pathlib
import threading
import unittest.mock
import emmental.db.tagger
import emmental.thread
import tests.util
@ -276,8 +276,8 @@ class TestTaggerThread(tests.util.TestCase):
def test_init(self, mock_file: unittest.mock.Mock):
"""Test that the tagger thread was initialized properly."""
self.assertIsInstance(self.tagger, threading.Thread)
self.assertIsInstance(self.tagger._condition, threading.Condition)
self.assertIsInstance(self.tagger, emmental.thread.Thread)
self.assertIsNone(self.tagger._connection)
self.assertTrue(self.tagger.is_alive())
def test_stop(self, mock_file: unittest.mock.Mock):
@ -285,74 +285,49 @@ class TestTaggerThread(tests.util.TestCase):
mock_connection = unittest.mock.Mock()
mock_connection.close = unittest.mock.Mock()
self.tagger._file = "abcde"
self.tagger._mtime = 12345
self.tagger._connection = mock_connection
self.tagger.stop()
with unittest.mock.patch.object(self.tagger._condition, "notify",
wraps=self.tagger._condition.notify) \
as mock_notify:
self.tagger.stop()
self.assertIsNone(self.tagger._file)
self.assertIsNone(self.tagger._mtime)
mock_notify.assert_called()
self.assertFalse(self.tagger.is_alive())
self.assertIsNone(self.tagger._connection)
mock_connection.close.assert_called()
def test_tag_file(self, mock_file: unittest.mock.Mock):
"""Test asking the thread to tag a file."""
path = pathlib.Path("/a/b/c.ogg")
self.assertIsInstance(self.tagger.ready, threading.Event)
self.assertIsNone(self.tagger._file)
self.assertIsNone(self.tagger._tags)
self.assertIsNone(self.tagger._mtime)
self.assertTrue(self.tagger.ready.is_set())
mock_file.return_value = None
self.tagger.ready.set()
self.tagger._tags = 12345
self.tagger.tag_file(path, None)
self.assertFalse(self.tagger.ready.is_set())
self.assertEqual(self.tagger._file, path)
self.assertIsNone(self.tagger._mtime)
self.assertIsNone(self.tagger._tags)
self.tagger.tag_file(path, mtime=None)
self.assertEqual(self.tagger._task, {"path": path, "mtime": None})
self.tagger.ready.wait()
self.assertIsNone(self.tagger._tags)
mock_file.assert_called_with(pathlib.Path("/a/b/c.ogg"), None)
mock_file.return_value = self.make_tags(dict())
self.tagger.tag_file(path, 12345)
self.assertEqual(self.tagger._mtime, 12345)
self.tagger.tag_file(path, mtime=12345)
self.assertEqual(self.tagger._task, {"path": path, "mtime": 12345})
self.tagger.ready.wait()
self.assertIsNotNone(self.tagger._tags)
mock_file.assert_called_with(self.tagger._file, 12345)
mock_file.assert_called_with(path, 12345)
def test_get_result(self, mock_file: unittest.mock.Mock):
"""Test creating a Tags structure after tagging."""
mock_file.return_value = None
self.tagger.tag_file(pathlib.Path("/a/b/c.ogg"), None)
self.assertTupleEqual(self.tagger.get_result(self.sql, self.library),
(None, None))
self.assertIsNone(self.tagger.get_result(db=self.sql,
library=self.library))
track_path = pathlib.Path("/a/b/c.ogg")
self.tagger.tag_file(track_path, mtime=None)
self.tagger.ready.wait()
self.assertTupleEqual(self.tagger.get_result(self.sql, self.library),
(pathlib.Path("/a/b/c.ogg"), None))
self.assertIsNone(self.tagger._file)
self.assertTupleEqual(self.tagger.get_result(db=self.sql,
library=self.library),
(track_path, None))
mock_file.return_value = self.make_tags(dict())
self.tagger.tag_file(pathlib.Path("/a/b/c.ogg"), None)
self.tagger.tag_file(track_path, mtime=None)
self.tagger.ready.wait()
(file, tags) = self.tagger.get_result(self.sql, self.library)
self.assertEqual(file, pathlib.Path("/a/b/c.ogg"))
self.assertIsInstance(tags, emmental.db.tagger.Tags)
self.assertIsNone(self.tagger._file)
self.assertIsNone(self.tagger._mtime)
self.assertIsNone(self.tagger._tags)
res = self.tagger.get_result(db=self.sql, library=self.library)
self.assertTupleEqual(res, (track_path, res[1]))
@unittest.mock.patch("emmental.db.connection.Connection.__call__")
@unittest.mock.patch("musicbrainzngs.get_artist_by_id")
@ -370,7 +345,7 @@ class TestTaggerThread(tests.util.TestCase):
mock_cursor.fetchone = unittest.mock.Mock(return_value=None)
mock_connection.return_value = mock_cursor
self.tagger.tag_file(pathlib.Path("/a/b/c.ogg"), None)
self.tagger.tag_file(pathlib.Path("/a/b/c.ogg"), mtime=None)
self.tagger.ready.wait()
self.assertEqual(audio_tags.artists[0].name, "Some Artist")
self.assertEqual(audio_tags.artists[1].name, "Some Artist")
@ -394,7 +369,7 @@ class TestTaggerThread(tests.util.TestCase):
self.assertIsNone(self.tagger._connection)
self.tagger.tag_file(pathlib.Path("/a/b/c.ogg"), None)
self.tagger.tag_file(pathlib.Path("/a/b/c.ogg"), mtime=None)
self.tagger.ready.wait()
self.assertIsInstance(self.tagger._connection,
emmental.db.connection.Connection)

View File

@ -247,8 +247,8 @@ class TestTrackTable(tests.util.TestCase):
def test_create_restore(self):
"""Test restoring saved track data."""
now = datetime.datetime.now()
today = datetime.date.today()
now = datetime.datetime.utcnow()
today = now.date()
yesterday = today - datetime.timedelta(days=1)
self.sql("""INSERT INTO saved_track_data
(mbid, favorite, playcount,
@ -292,6 +292,20 @@ class TestTrackTable(tests.util.TestCase):
self.assertFalse(track.delete())
def test_delete_listens(self):
"""Test deleting listens from the listenbrainz_queue."""
track1 = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year, length=10)
track2 = self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
self.medium, self.year, length=10)
for track in [track1, track2]:
track.start()
track.stop(9)
self.tracks.delete_listens([1, 2])
self.assertListEqual(self.tracks.get_n_listens(5), [])
def test_delete_save(self):
"""Test saving track data when a track is deleted."""
now = datetime.datetime.now()
@ -314,7 +328,7 @@ class TestTrackTable(tests.util.TestCase):
self.assertEqual(rows[0]["laststarted"], now)
self.assertEqual(rows[0]["lastplayed"], now)
self.assertEqual(rows[0]["playcount"], 42)
self.assertEqual(rows[0]["added"], datetime.date.today())
self.assertEqual(rows[0]["added"], datetime.datetime.utcnow().date())
def test_filter(self):
"""Test filtering the Track table."""
@ -485,6 +499,40 @@ class TestTrackTable(tests.util.TestCase):
self.assertListEqual(self.tracks.get_genres(track),
[genre1, genre2])
def test_get_n_listens(self):
"""Test the get_n_listens() function."""
track1 = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year, length=10)
track2 = self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
self.medium, self.year, length=12)
self.assertListEqual(self.tracks.get_n_listens(2), [])
track1.start()
track1.stop(8)
ts1 = track1.lastplayed
self.assertListEqual(self.tracks.get_n_listens(2),
[(1, track1, ts1)])
track2.start()
track2.stop(11)
ts2 = track2.lastplayed
self.assertListEqual(self.tracks.get_n_listens(2),
[(2, track2, ts2),
(1, track1, ts1)])
track1.start()
track1.stop(9)
ts3 = track1.lastplayed
self.assertListEqual(self.tracks.get_n_listens(2),
[(3, track1, ts3),
(2, track2, ts2)])
self.assertListEqual(self.tracks.get_n_listens(4),
[(3, track1, ts3),
(2, track2, ts2),
(1, track1, ts1)])
def test_mark_path_active(self):
"""Test marking a path as active."""
self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
@ -534,6 +582,8 @@ class TestTrackTable(tests.util.TestCase):
"""Test marking that a Track has stopped playback."""
track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year, length=10)
track_played = unittest.mock.Mock()
self.tracks.connect("track-played", track_played)
track.start()
with unittest.mock.patch.object(self.sql, "commit",
@ -549,9 +599,13 @@ class TestTrackTable(tests.util.TestCase):
self.assertIsNone(track.lastplayed)
self.assertIsNone(self.tracks.current_track)
cur = self.sql("SELECT trackid, timestamp FROM listenbrainz_queue")
self.assertListEqual(cur.fetchall(), [])
self.playlists.most_played.reload_tracks.assert_not_called()
self.playlists.queued.remove_track.assert_not_called()
self.playlists.unplayed.remove_track.assert_not_called()
track_played.assert_not_called()
track.start()
with unittest.mock.patch.object(self.sql, "commit",
@ -566,14 +620,22 @@ class TestTrackTable(tests.util.TestCase):
self.assertEqual(row["lastplayed"], track.laststarted)
self.assertEqual(track.lastplayed, track.laststarted)
cur = self.sql("SELECT trackid, timestamp FROM listenbrainz_queue")
row = cur.fetchall()[0]
self.assertEqual(row["trackid"], track.trackid)
self.assertEqual(row["timestamp"], track.lastplayed)
self.playlists.most_played.reload_tracks.assert_called()
self.playlists.queued.remove_track.assert_called_with(track)
self.playlists.unplayed.remove_track.assert_called_with(track)
track_played.assert_called_with(self.tracks, track)
def test_stop_restarted_track(self):
"""Test marking that a restarted Track has stopped playback."""
track = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year, length=10)
track_played = unittest.mock.Mock()
self.tracks.connect("track-played", track_played)
track.restart()
track.stop(3)
@ -588,9 +650,13 @@ class TestTrackTable(tests.util.TestCase):
self.assertIsNone(track.restarted)
self.assertIsNone(self.tracks.current_track)
cur = self.sql("SELECT trackid, timestamp FROM listenbrainz_queue")
self.assertListEqual(cur.fetchall(), [])
self.playlists.most_played.reload_tracks.assert_not_called()
self.playlists.queued.remove_track.assert_not_called()
self.playlists.unplayed.remove_track.assert_not_called()
track_played.assert_not_called()
track.restart()
restarted = track.restarted
@ -604,9 +670,15 @@ class TestTrackTable(tests.util.TestCase):
self.assertEqual(row["laststarted"], restarted)
self.assertEqual(track.laststarted, restarted)
cur = self.sql("SELECT trackid, timestamp FROM listenbrainz_queue")
row = cur.fetchall()[0]
self.assertEqual(row["trackid"], track.trackid)
self.assertEqual(row["timestamp"], track.lastplayed)
self.playlists.most_played.reload_tracks.assert_called_with(idle=True)
self.playlists.queued.remove_track.assert_called_with(track)
self.playlists.unplayed.remove_track.assert_called_with(track)
track_played.assert_called_with(self.tracks, track)
def test_current_track(self):
"""Test the current-track and have-current-track properties."""

View File

@ -52,30 +52,87 @@ class TestHeader(tests.util.TestCase):
self.assertFalse(self.header.show_sidebar)
def test_open(self):
"""Check that the Open button works as expected."""
self.assertIsInstance(self.header._open, emmental.header.open.Button)
"""Check that the Open ActionRow works as expected."""
self.assertIsInstance(self.header._open, emmental.header.open.OpenRow)
self.assertEqual(self.header._menu_box.get_row_at_index(0),
self.header._open)
signal = unittest.mock.Mock()
self.header.connect("track-requested", signal)
self.header._open.emit("track-requested", pathlib.Path("/a/b/c/1.ogg"))
signal.assert_called_with(self.header, pathlib.Path("/a/b/c/1.ogg"))
def test_listenbrainz(self):
"""Check that the ListenBrainzRow is set up correctly."""
self.assertIsInstance(self.header._listenbrainz, Adw.PasswordEntryRow)
self.assertEqual(self.header._menu_box.get_row_at_index(1),
self.header._listenbrainz)
self.assertEqual(self.header.listenbrainz_token, "")
self.assertEqual(self.header._listenbrainz.props.text, "")
self.header.listenbrainz_token = "abcde"
self.assertEqual(self.header._listenbrainz.props.text, "abcde")
with unittest.mock.patch.object(self.header._menu_button,
"popdown") as mock_popdown:
self.header._listenbrainz.props.text = "fghij"
self.header._listenbrainz.emit("apply")
self.assertEqual(self.header.listenbrainz_token, "fghij")
mock_popdown.assert_called()
self.header._listenbrainz.props.text = "abcde"
self.header._menu_button.get_popover().emit("closed")
self.assertEqual(self.header._listenbrainz.props.text, "fghij")
def test_listenbrainz_token_valid(self):
"""Test the listenbrainz-token-valid property."""
win = Gtk.Window(titlebar=self.header)
win.post_toast = unittest.mock.Mock()
self.assertTrue(self.header.listenbrainz_token_valid)
self.header.listenbrainz_token_valid = False
self.assertTrue(self.header._menu_button.has_css_class("warning"))
self.assertTrue(self.header._listenbrainz.has_css_class("warning"))
self.assertFalse(self.header.listenbrainz_token_valid)
win.post_toast.assert_called_with(
"listenbrainz: user token is invalid")
win.post_toast.reset_mock()
self.header.listenbrainz_token_valid = True
self.assertFalse(self.header._menu_button.has_css_class("warning"))
self.assertFalse(self.header._listenbrainz.has_css_class("warning"))
self.assertTrue(self.header.listenbrainz_token_valid)
win.post_toast.assert_not_called()
def test_settings(self):
"""Check that the Settings window is set up correctly."""
self.assertIsInstance(self.header._settings, Gtk.Button)
self.assertIsInstance(self.header._window,
emmental.header.settings.Window)
"""Check that the SettingsRow is set up correctly."""
self.assertIsInstance(self.header._settings,
emmental.header.settings.Row)
self.assertEqual(self.header._menu_box.get_row_at_index(2),
self.header._settings)
self.assertEqual(self.header.sql, self.sql)
self.assertEqual(self.header._settings.get_icon_name(),
"settings-symbolic")
self.assertEqual(self.header._settings.get_tooltip_text(),
"open settings editor")
def test_menu_button(self):
"""Check that the menu popover button is set up properly."""
self.assertIsInstance(self.header._menu_button,
emmental.buttons.PopoverButton)
self.assertIsNotNone(self.header._menu_button.props.parent)
with unittest.mock.patch.object(self.header._window,
"present") as mock_present:
self.header._settings.emit("clicked")
mock_present.assert_called()
self.assertEqual(self.header._menu_button.props.icon_name,
"open-menu-symbolic")
self.assertEqual(self.header._menu_button.popover_child,
self.header._menu_box)
def test_menu_popover_child(self):
"""Check that the menu popover button child was set up correctly."""
self.assertIsInstance(self.header._menu_box, Gtk.ListBox)
self.assertEqual(self.header._menu_box.get_selection_mode(),
Gtk.SelectionMode.NONE)
self.assertTrue(self.header._menu_box.has_css_class("boxed-list"))
self.assertEqual(self.header._menu_box.get_row_at_index(0),
self.header._open)
def test_volume_icons(self):
"""Check that the volume icons box is set up properly."""
@ -110,7 +167,7 @@ class TestHeader(tests.util.TestCase):
self.assertEqual(self.header._volume.volume, vol)
self.assertEqual(self.header._volume_icon.get_icon_name(),
f"audio-volume-{icon}-symbolic")
self.assertEqual(self.header._button.get_tooltip_text(),
self.assertEqual(self.header._vol_button.get_tooltip_text(),
f"volume: {i*10}%\n"
"background listening: off\nnormalizing: off")
@ -128,19 +185,19 @@ class TestHeader(tests.util.TestCase):
self.assertTrue(self.header._background.enabled)
self.assertEqual(self.header._background_icon.get_icon_name(),
"sound-wave-alt")
self.assertEqual(self.header._button.get_tooltip_text(),
self.assertEqual(self.header._vol_button.get_tooltip_text(),
"volume: 100%\nbackground listening: 50%\n"
"normalizing: off")
self.header.bg_volume = 0.75
self.assertEqual(self.header._background.volume, 0.75)
self.assertEqual(self.header._button.get_tooltip_text(),
self.assertEqual(self.header._vol_button.get_tooltip_text(),
"volume: 100%\nbackground listening: 75%\n"
"normalizing: off")
self.header._background.volume = 0.25
self.assertEqual(self.header.bg_volume, 0.25)
self.assertEqual(self.header._button.get_tooltip_text(),
self.assertEqual(self.header._vol_button.get_tooltip_text(),
"volume: 100%\nbackground listening: 25%\n"
"normalizing: off")
@ -160,7 +217,7 @@ class TestHeader(tests.util.TestCase):
self.header.rg_mode = "track"
self.assertTrue(self.header._replaygain.enabled)
self.assertEqual(self.header._replaygain.mode, "track")
self.assertEqual(self.header._button.get_tooltip_text(),
self.assertEqual(self.header._vol_button.get_tooltip_text(),
"volume: 100%\nbackground listening: off\n"
"normalizing: track mode")
@ -168,32 +225,34 @@ class TestHeader(tests.util.TestCase):
self.header._replaygain.mode = "album"
self.assertFalse(self.header.rg_enabled)
self.assertEqual(self.header.rg_mode, "album")
self.assertEqual(self.header._button.get_tooltip_text(),
self.assertEqual(self.header._vol_button.get_tooltip_text(),
"volume: 100%\nbackground listening: off\n"
"normalizing: off")
def test_popover_button(self):
"""Check that the menu popover button was set up correctly."""
self.assertIsInstance(self.header._button,
def test_volume_popover_button(self):
"""Check that the volume popover button was set up correctly."""
self.assertIsInstance(self.header._vol_button,
emmental.buttons.PopoverButton)
self.assertEqual(self.header._button.popover_child, self.header._box)
self.assertEqual(self.header._vol_button.popover_child,
self.header._vol_box)
self.assertEqual(self.header._button.get_child(), self.header._icons)
self.assertEqual(self.header._button.get_margin_end(), 6)
self.assertFalse(self.header._button.get_has_frame())
self.assertEqual(self.header._vol_button.get_child(),
self.header._icons)
self.assertEqual(self.header._vol_button.get_margin_end(), 6)
self.assertFalse(self.header._vol_button.get_has_frame())
def test_popover_child(self):
"""Check that the menu popover button child was set up correctly."""
self.assertIsInstance(self.header._box, Gtk.ListBox)
self.assertEqual(self.header._box.get_selection_mode(),
def test_volume_popover_child(self):
"""Check that the volume popover button child was set up correctly."""
self.assertIsInstance(self.header._vol_box, Gtk.ListBox)
self.assertEqual(self.header._vol_box.get_selection_mode(),
Gtk.SelectionMode.NONE)
self.assertTrue(self.header._box.has_css_class("boxed-list"))
self.assertTrue(self.header._vol_box.has_css_class("boxed-list"))
self.assertEqual(self.header._box.get_row_at_index(0),
self.assertEqual(self.header._vol_box.get_row_at_index(0),
self.header._volume)
self.assertEqual(self.header._box.get_row_at_index(1),
self.assertEqual(self.header._vol_box.get_row_at_index(1),
self.header._background)
self.assertEqual(self.header._box.get_row_at_index(2),
self.assertEqual(self.header._vol_box.get_row_at_index(2),
self.header._replaygain)
def test_accelerators(self):

View File

@ -0,0 +1,25 @@
# Copyright 2024 (c) Anna Schumaker.
"""Tests our Listenbrainz User Token entry."""
import emmental.header.listenbrainz
import unittest
from gi.repository import Gtk
from gi.repository import Adw
class TestListenbrainzRow(unittest.TestCase):
"""Test the ListenBrainzRow."""
def setUp(self):
"""Set up common variables."""
self.row = emmental.header.listenbrainz.ListenBrainzRow()
def test_init(self):
"""Test that the ListenBrainzRow was set up properly."""
self.assertIsInstance(self.row, Adw.PasswordEntryRow)
self.assertIsInstance(self.row.prefix, Gtk.Image)
self.assertEqual(self.row.props.title, "ListenBrainz User Token")
self.assertTrue(self.row.props.show_apply_button)
self.assertEqual(self.row.prefix.props.icon_name,
"listenbrainz-logo-symbolic")

View File

@ -1,58 +1,69 @@
# Copyright 2023 (c) Anna Schumaker.
"""Tests our Open button."""
"""Tests our Open Adw.ActionRow."""
import emmental.header.open
import pathlib
import unittest
from gi.repository import Gio
from gi.repository import Gtk
from gi.repository import Adw
class TestButton(unittest.TestCase):
"""Test the Open button."""
class TestOpenRow(unittest.TestCase):
"""Test the Open row."""
def setUp(self):
"""Set up common variables."""
self.button = emmental.header.open.Button()
self.row = emmental.header.open.OpenRow()
def test_button(self):
"""Check that the button was set up properly."""
self.assertIsInstance(self.button, Gtk.Button)
self.assertEqual(self.button.get_icon_name(), "document-open-symbolic")
self.assertEqual(self.button.get_tooltip_text(),
"open a file for playback")
def test_action_row(self):
"""Check that the action row was set up properly."""
self.assertIsInstance(self.row, Adw.ActionRow)
self.assertIsInstance(self.row._prefix, Gtk.Image)
self.assertEqual(self.row.props.title, "Open File")
self.assertEqual(self.row.props.subtitle, "Select a file for playback")
self.assertTrue(self.row.props.activatable)
self.assertEqual(self.row._prefix.props.icon_name,
"document-open-symbolic")
def test_filter(self):
"""Check that the file filter is set up properly."""
self.assertIsInstance(self.button._filter, Gtk.FileFilter)
self.assertIsInstance(self.button._filters, Gio.ListStore)
self.assertIsInstance(self.row._filter, Gtk.FileFilter)
self.assertIsInstance(self.row._filters, Gio.ListStore)
self.assertEqual(self.button._filter.get_name(), "Audio Files")
self.assertEqual(self.button._filters[0], self.button._filter)
self.assertEqual(self.row._filter.get_name(), "Audio Files")
self.assertEqual(self.row._filters[0], self.row._filter)
def test_dialog(self):
"""Check that the file dialog is set up properly."""
self.assertIsInstance(self.button._dialog, Gtk.FileDialog)
self.assertEqual(self.button._dialog.get_title(), "Pick a Track")
self.assertEqual(self.button._dialog.get_filters(),
self.button._filters)
self.assertTrue(self.button._dialog.get_modal())
self.assertIsInstance(self.row._dialog, Gtk.FileDialog)
self.assertEqual(self.row._dialog.get_title(), "Pick a Track")
self.assertEqual(self.row._dialog.get_filters(),
self.row._filters)
self.assertTrue(self.row._dialog.get_modal())
def test_clicked(self):
"""Test clicking on the button."""
with unittest.mock.patch.object(self.button._dialog,
"open") as mock_open:
self.button.emit("clicked")
mock_open.assert_called_with(None, None,
self.button._Button__async_ready)
def test_activate(self):
"""Test activating an OpenRow."""
listbox = Gtk.ListBox()
popover = Gtk.Popover(child=listbox)
listbox.append(self.row)
with unittest.mock.patch.object(self.button._dialog,
with unittest.mock.patch.object(popover, "popdown") as mock_popdown:
with unittest.mock.patch.object(self.row._dialog,
"open") as mock_open:
self.row.emit("activated")
mock_popdown.assert_called()
mock_open.assert_called_with(None, None,
self.row._OpenRow__async_ready)
with unittest.mock.patch.object(self.row._dialog,
"open_finish") as mock_finish:
task = Gio.Task()
signal = unittest.mock.Mock()
mock_finish.return_value = Gio.File.new_for_path("/a/b/c/1.ogg")
self.button.connect("track-requested", signal)
self.row.connect("track-requested", signal)
self.button._Button__async_ready(self.button._dialog, task)
self.row._OpenRow__async_ready(self.row._dialog, task)
mock_finish.assert_called_with(task)
signal.assert_called_with(self.button,
pathlib.Path("/a/b/c/1.ogg"))
signal.assert_called_with(self.row, pathlib.Path("/a/b/c/1.ogg"))

View File

@ -141,3 +141,39 @@ class TestWindow(tests.util.TestCase):
emmental.header.settings.ValueRow)
self.assertEqual(columns[1].get_title(), "Value")
self.assertEqual(columns[1].get_fixed_width(), 100)
class TestSettingsRow(tests.util.TestCase):
"""Test the SettingsRow."""
def setUp(self):
"""Set up common variables."""
super().setUp()
self.row = emmental.header.settings.Row(sql=self.sql)
def test_init(self):
"""Test that the SettingsRow was set up properly."""
self.assertIsInstance(self.row, Adw.ActionRow)
self.assertIsInstance(self.row._prefix, Gtk.Image)
self.assertIsInstance(self.row._window,
emmental.header.settings.Window)
self.assertEqual(self.row.props.title, "Edit Settings")
self.assertEqual(self.row.props.subtitle,
"Open the settings editor (debug only)")
self.assertTrue(self.row.props.activatable)
self.assertEqual(self.row._prefix.props.icon_name, "settings-symbolic")
def test_activate(self):
"""Test activating a SettingsRow."""
listbox = Gtk.ListBox()
popover = Gtk.Popover(child=listbox)
listbox.append(self.row)
with unittest.mock.patch.object(popover, "popdown") as mock_popdown:
with unittest.mock.patch.object(self.row._window,
"present") as mock_present:
self.row.emit("activated")
mock_popdown.assert_called()
mock_present.assert_called()

View File

@ -0,0 +1,58 @@
# Copyright 2024 (c) Anna Schumaker.
"""Test creating a liblistenbrainz.Listen from a Track."""
import datetime
import dateutil.tz
import emmental.listenbrainz.listen
import liblistenbrainz
import pathlib
import tests.util
class TestListen(tests.util.TestCase):
"""ListenBrainz Listen test case."""
def setUp(self):
"""Set up common variables."""
super().setUp()
self.library = self.sql.libraries.create(pathlib.Path("/a/b"))
self.artists = [self.sql.artists.create("Artist 1", mbid="mbid-ar1"),
self.sql.artists.create("Artist 2"),
self.sql.artists.create("Artist 3", mbid="mbid-ar3")]
self.album = self.sql.albums.create("Test Album", "Test Artist",
release="1988-06",
mbid="mbid-release")
self.medium = self.sql.media.create(self.album, "", number=1)
self.year = self.sql.years.create(1988)
self.track = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/c.ogg"),
self.medium, self.year,
title="Track 1", number=1,
artist="Track Artist")
for artist in self.artists:
artist.add_track(self.track)
self.listen = emmental.listenbrainz.listen.Listen(self.track)
def test_init(self):
"""Test initializing our Listen instance."""
self.assertIsInstance(self.listen, liblistenbrainz.Listen)
self.assertEqual(self.listen.track_name, "Track 1")
self.assertEqual(self.listen.artist_name, "Track Artist")
self.assertEqual(self.listen.release_name, "Test Album")
self.assertEqual(self.listen.release_group_mbid, "mbid-release")
self.assertEqual(self.listen.tracknumber, 1)
self.assertDictEqual(self.listen.additional_info,
{"media_player": "emmental-debug"})
self.assertListEqual(self.listen.artist_mbids,
["mbid-ar1", "mbid-ar3"])
self.assertIsNone(self.listen.listened_at)
self.assertIsNone(self.listen.listenid)
utc_now = datetime.datetime.utcnow()
local_now = utc_now.replace(tzinfo=dateutil.tz.tzutc()).astimezone()
listen = emmental.listenbrainz.listen.Listen(self.track,
listenid=1234,
listened_at=utc_now)
self.assertEqual(listen.listenid, 1234)
self.assertEqual(listen.listened_at, local_now.timestamp())

View File

@ -0,0 +1,315 @@
# Copyright 2024 (c) Anna Schumaker.
"""Tests our custom ListenBrainz GObject."""
import datetime
import emmental.listenbrainz
import io
import pathlib
import tests.util
import unittest
from gi.repository import GObject
from gi.repository import GLib
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
@unittest.mock.patch("gi.repository.GLib.source_remove")
@unittest.mock.patch("gi.repository.GLib.idle_add", return_value=42)
class TestListenBrainz(tests.util.TestCase):
"""ListenBrainz GObject test case."""
def setUp(self):
"""Set up common variables."""
super().setUp()
self.listenbrainz = emmental.listenbrainz.ListenBrainz(self.sql)
self.library = self.sql.libraries.create(pathlib.Path("/a/b"))
self.album = self.sql.albums.create("Test Album", "Test Artist",
release="1988-06",
mbid="mbid-release")
self.medium = self.sql.media.create(self.album, "", number=1)
self.year = self.sql.years.create(1988)
self.track = self.sql.tracks.create(self.library,
pathlib.Path("/a/b/c.ogg"),
self.medium, self.year,
title="Track 1", number=1,
artist="Track Artist", length=10)
@unittest.mock.patch("gi.repository.GLib.source_remove")
def tearDown(self, mock_source_remove: unittest.mock.Mock):
"""Clean up."""
self.listenbrainz.stop()
def test_init(self, mock_idle_add: unittest.mock.Mock,
mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO):
"""Test that the ListenBrainz GObject was set up properly."""
self.assertIsInstance(self.listenbrainz, GObject.GObject)
self.assertIsInstance(self.listenbrainz._queue,
emmental.listenbrainz.task.Queue)
self.assertIsInstance(self.listenbrainz._thread,
emmental.listenbrainz.thread.Thread)
self.assertIsNone(self.listenbrainz._idle_id)
self.assertEqual(self.listenbrainz.sql, self.sql)
self.assertIsNone(self.listenbrainz._timeout_id)
def test_early_idle_work(self, mock_idle_add: unittest.mock.Mock,
mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO):
"""Test __idle_work() before the database has finished loading."""
with unittest.mock.patch.object(self.listenbrainz._thread.ready,
"is_set") as mock_is_set:
self.assertEqual(self.listenbrainz._ListenBrainz__idle_work(),
GLib.SOURCE_CONTINUE)
mock_is_set.assert_not_called()
self.sql.loaded = True
self.assertEqual(self.listenbrainz._ListenBrainz__idle_work(),
GLib.SOURCE_REMOVE)
mock_is_set.assert_called()
def test_stop(self, mock_idle_add: unittest.mock.Mock,
mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO):
"""Test stopping the thread during shutdown."""
self.listenbrainz._idle_id = 12345
self.listenbrainz._timeout_id = 67890
self.listenbrainz.stop()
self.assertFalse(self.listenbrainz._thread.is_alive())
self.assertIsNone(self.listenbrainz._idle_id)
self.assertIsNone(self.listenbrainz._timeout_id)
mock_source_remove.assert_has_calls([unittest.mock.call(12345),
unittest.mock.call(67890)])
def test_set_user_token(self, mock_idle_add: unittest.mock.Mock,
mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO):
"""Test setting the user-token property."""
self.assertEqual(self.listenbrainz.user_token, "")
self.assertTrue(self.listenbrainz.valid_token)
self.assertTrue(self.listenbrainz.offline)
self.sql.loaded = True
idle_work = self.listenbrainz._ListenBrainz__idle_work
with unittest.mock.patch.object(self.listenbrainz._thread,
"set_user_token") as mock_set_token:
self.listenbrainz.user_token = "abc"
self.assertEqual(self.listenbrainz._queue._set_token,
("set-token", "abc"))
self.assertEqual(self.listenbrainz._idle_id, 42)
mock_idle_add.assert_called_with(idle_work)
self.assertEqual(idle_work(), GLib.SOURCE_CONTINUE)
mock_set_token.assert_called_with("abc")
mock_idle_add.reset_mock()
self.listenbrainz.user_token = "abcde"
self.assertEqual(self.listenbrainz._queue._set_token,
("set-token", "abcde"))
self.assertEqual(self.listenbrainz._idle_id, 42)
mock_idle_add.assert_not_called()
self.listenbrainz._thread.set_result(op="set-token", token="abc",
valid=True)
self.assertEqual(idle_work(), GLib.SOURCE_CONTINUE)
mock_set_token.assert_called_with("abcde")
self.listenbrainz._thread.ready.clear()
self.assertEqual(idle_work(), GLib.SOURCE_CONTINUE)
for valid, offline in [(False, False), (False, True),
(True, False), (True, True)]:
with self.subTest(valid=valid, offline=offline):
self.listenbrainz._thread.set_result(op="set-token",
token="abcde",
valid=valid,
offline=offline)
self.assertEqual(idle_work(), GLib.SOURCE_REMOVE)
self.assertEqual(self.listenbrainz.valid_token, valid)
self.assertEqual(self.listenbrainz.offline, offline)
self.assertIsNone(self.listenbrainz._idle_id)
def test_clear_user_token(self, mock_idle_add: unittest.mock.Mock,
mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO):
"""Test clearing the user-token property."""
self.sql.loaded = True
idle_work = self.listenbrainz._ListenBrainz__idle_work
with unittest.mock.patch.object(self.listenbrainz._thread,
"clear_user_token") as mock_clear:
self.listenbrainz.valid_token = False
self.listenbrainz.user_token = ""
self.assertEqual(self.listenbrainz._queue._set_token,
("clear-token",))
self.assertEqual(self.listenbrainz._idle_id, 42)
mock_idle_add.assert_called_with(idle_work)
self.assertEqual(idle_work(), GLib.SOURCE_CONTINUE)
mock_clear.assert_called()
self.listenbrainz._thread.set_result(op="clear-token", valid=True)
self.assertEqual(idle_work(), GLib.SOURCE_REMOVE)
self.assertTrue(self.listenbrainz.valid_token)
self.assertIsNone(self.listenbrainz._idle_id)
def test_submit_now_playing(self, mock_idle_add: unittest.mock.Mock,
mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO):
"""Test setting the now-playing property."""
self.assertIsNone(self.listenbrainz.now_playing)
self.sql.loaded = True
self.listenbrainz.user_token = "abcde"
self.listenbrainz.valid_token = True
self.listenbrainz.offline = False
self.listenbrainz._queue.pop()
self.listenbrainz._ListenBrainz__source_stop("_idle_id")
self.listenbrainz.now_playing = self.track
self.assertTupleEqual(self.listenbrainz._queue._now_playing,
("now-playing", self.track))
self.assertEqual(self.listenbrainz._idle_id, 42)
idle_work = self.listenbrainz._ListenBrainz__idle_work
with unittest.mock.patch.object(self.listenbrainz._thread,
"submit_now_playing") as mock_playing:
self.assertEqual(idle_work(), GLib.SOURCE_CONTINUE)
mock_playing.assert_called()
self.assertIsInstance(mock_playing.call_args.args[0],
emmental.listenbrainz.listen.Listen)
for valid, offline in [(False, False), (False, True),
(True, False), (True, True)]:
with self.subTest(valid=valid, offline=offline):
self.listenbrainz._thread.set_result(op="now-playing",
valid=valid,
offline=offline)
self.assertEqual(idle_work(), GLib.SOURCE_REMOVE)
self.assertEqual(self.listenbrainz.valid_token, valid)
self.assertEqual(self.listenbrainz.offline, offline)
def test_submit_now_playing_later(self, mock_idle_add: unittest.mock.Mock,
mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO):
"""Test the now-playing property when ListenBrainz is disconnected."""
self.assertIsNone(self.listenbrainz.now_playing)
self.listenbrainz.now_playing = self.track
self.assertTupleEqual(self.listenbrainz._queue._now_playing,
("now-playing", self.track))
self.assertIsNone(self.listenbrainz._idle_id)
self.listenbrainz.user_token = "abcde"
self.listenbrainz.valid_token = False
self.listenbrainz._queue.pop()
self.listenbrainz._ListenBrainz__source_stop("_idle_id")
self.listenbrainz.now_playing = self.track
self.assertTupleEqual(self.listenbrainz._queue._now_playing,
("now-playing", self.track))
self.assertIsNone(self.listenbrainz._idle_id)
self.listenbrainz.valid_token = True
self.listenbrainz._queue._now_playing = "abcde"
self.listenbrainz.now_playing = None
self.assertIsNone(self.listenbrainz._queue._now_playing)
self.assertIsNone(self.listenbrainz._idle_id)
self.listenbrainz.offline = True
self.listenbrainz.now_playing = self.track
self.assertTupleEqual(self.listenbrainz._queue._now_playing,
("now-playing", self.track))
self.assertIsNone(self.listenbrainz._idle_id)
def test_submit_listens(self, mock_idle_add: unittest.mock.Mock,
mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO):
"""Test submitting recently listened tracks."""
ts1 = datetime.datetime.utcnow()
ts2 = datetime.datetime.utcnow()
idle_work = self.listenbrainz._ListenBrainz__idle_work
listens = [emmental.listenbrainz.listen.Listen(self.track, listenid=1,
listened_at=ts1),
emmental.listenbrainz.listen.Listen(self.track, listenid=2,
listened_at=ts2)]
self.sql.loaded = True
self.listenbrainz.user_token = "abcde"
self.listenbrainz.valid_token = True
self.listenbrainz._queue.pop()
self.listenbrainz._ListenBrainz__source_stop("_idle_id")
self.listenbrainz.offline = False
self.listenbrainz.submit_listens("ignored", "args")
self.assertIsNotNone(self.listenbrainz._idle_id)
with unittest.mock.patch.object(self.sql.tracks,
"get_n_listens") as mock_get_listens:
mock_get_listens.return_value = [(1, self.track, ts1),
(2, self.track, ts2)]
with unittest.mock.patch.object(self.listenbrainz._thread,
"submit_listens") as mock_submit:
self.assertEqual(idle_work(), GLib.SOURCE_CONTINUE)
mock_get_listens.assert_called_with(50)
mock_submit.assert_called()
with unittest.mock.patch.object(self.sql.tracks,
"delete_listens") as mock_delete:
for valid, offline in [(False, False), (False, True),
(True, False), (True, True)]:
mock_delete.reset_mock()
with self.subTest(valid=valid, offline=offline):
self.listenbrainz._thread.set_result(op="submit-listens",
listens=listens,
valid=valid,
offline=offline)
self.assertEqual(idle_work(), GLib.SOURCE_REMOVE)
self.assertEqual(self.listenbrainz.valid_token, valid)
self.assertEqual(self.listenbrainz.offline, offline)
if valid is True and offline is False:
mock_delete.assert_called_with([1, 2])
else:
mock_delete.assert_not_called()
def test_submit_listens_later(self, mock_idle_add: unittest.mock.Mock,
mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO):
"""Test submitting listens when ListenBrainz is disconnected."""
self.listenbrainz.submit_listens("ignored", "args")
self.assertIsNone(self.listenbrainz._idle_id)
self.listenbrainz.user_token = "abcde"
self.listenbrainz.valid_token = False
self.listenbrainz._queue.pop()
self.listenbrainz._idle_id = None
self.listenbrainz.submit_listens("ignored", "args")
self.assertIsNone(self.listenbrainz._idle_id)
self.listenbrainz.valid_token = True
self.listenbrainz.offline = True
self.listenbrainz.submit_listens("ignored", "args")
self.assertIsNone(self.listenbrainz._idle_id)
@unittest.mock.patch("gi.repository.GLib.timeout_add_seconds")
def test_offline_recovery(self, mock_timeout_add: unittest.mock.Mock,
mock_idle_add: unittest.mock.Mock,
mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO):
"""Test handling an offline response."""
self.assertTrue(self.listenbrainz.offline)
check_func = self.listenbrainz._ListenBrainz__check_online
mock_timeout_add.return_value = 67890
self.listenbrainz.offline = True
self.assertEqual(self.listenbrainz._timeout_id, 67890)
mock_timeout_add.assert_called_with(300, check_func)
mock_timeout_add.reset_mock()
mock_timeout_add.return_value = 99999
self.listenbrainz.offline = True
self.assertEqual(self.listenbrainz._timeout_id, 67890)
mock_timeout_add.assert_not_called()
self.listenbrainz.offline = False
mock_source_remove.assert_called_with(67890)
mock_source_remove.reset_mock()
self.listenbrainz.offline = False
mock_source_remove.assert_not_called()

View File

@ -0,0 +1,68 @@
# Copyright 2024 (c) Anna Schumaker.
"""Tests our ListenBrainz priority queue."""
import emmental.listenbrainz.task
import liblistenbrainz
import unittest
class TestTaskQueue(unittest.TestCase):
"""Test the ListenBrainz queue."""
def setUp(self):
"""Set up common variables."""
self.queue = emmental.listenbrainz.task.Queue()
def test_init(self):
"""Test that the queue was set up properly."""
self.assertIsNotNone(self.queue)
self.assertTupleEqual(self.queue.pop(), ("submit-listens",))
def test_push_set_token(self):
"""Test calling push() with the 'set-token' operation."""
self.assertIsNone(self.queue._set_token)
self.queue.push("set-token", "abcde")
self.assertTupleEqual(self.queue._set_token, ("set-token", "abcde"))
self.queue.push("set-token", "fghij")
self.assertTupleEqual(self.queue._set_token, ("set-token", "fghij"))
self.assertTupleEqual(self.queue.pop(), ("set-token", "fghij"))
self.assertIsNone(self.queue._set_token)
self.queue.push("set-token", "abcde")
self.queue.clear("set-token")
self.assertIsNone(self.queue._set_token)
self.assertTupleEqual(self.queue.pop(), ("submit-listens",))
def test_push_clear_token(self):
"""Test calling push() with the 'clear-token' operation."""
self.queue.push("clear-token")
self.assertTupleEqual(self.queue._set_token, ("clear-token",))
self.assertTupleEqual(self.queue.pop(), ("clear-token",))
self.assertIsNone(self.queue._set_token)
self.queue.push("clear-token")
self.queue.clear("clear-token")
self.assertIsNone(self.queue._set_token)
self.assertTupleEqual(self.queue.pop(), ("submit-listens",))
def test_push_now_playing(self):
"""Test the push_now_playing() function."""
self.assertIsNone(self.queue._now_playing)
listen = liblistenbrainz.Listen("Track Name", "Artist Name")
self.queue.push("now-playing", listen)
self.assertTupleEqual(self.queue._now_playing, ("now-playing", listen))
self.queue.push("set-token", "abcde")
self.assertTupleEqual(self.queue.pop(), ("set-token", "abcde"))
self.assertTupleEqual(self.queue.pop(), ("now-playing", listen))
self.assertIsNone(self.queue._now_playing)
self.queue.push("now-playing", listen)
self.queue.clear("now-playing")
self.assertIsNone(self.queue._now_playing)
self.assertTupleEqual(self.queue.pop(), ("submit-listens",))

View File

@ -0,0 +1,189 @@
# Copyright 2024 (c) Anna Schumaker.
"""Tests our ListenBrainz client thread."""
import emmental.listenbrainz.thread
import io
import liblistenbrainz
import requests
import unittest
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
class TestThread(unittest.TestCase):
"""ListenBrainz Thread test case."""
def setUp(self):
"""Set up common variables."""
self.thread = emmental.listenbrainz.thread.Thread()
def tearDown(self):
"""Clean up."""
self.thread.stop()
def test_init(self, mock_stdout: io.StringIO):
"""Test that the ListenBrainz thread was initialized properly."""
self.assertIsInstance(self.thread, emmental.thread.Thread)
self.assertIsInstance(self.thread._client,
liblistenbrainz.client.ListenBrainz)
def test_clear_user_token(self, mock_stdout: io.StringIO):
"""Test clearing the user token."""
with unittest.mock.patch.object(self.thread._client,
"set_auth_token") as mock_set_auth:
self.thread.clear_user_token()
self.assertFalse(self.thread.ready.is_set())
self.assertEqual(self.thread._task, {"op": "clear-token"})
self.assertEqual(mock_stdout.getvalue(),
"listenbrainz: clearing user token\n")
self.thread.ready.wait()
mock_set_auth.assert_called_with(None, check_validity=False)
self.assertEqual(self.thread.get_result(),
{"op": "clear-token", "valid": True,
"offline": False})
def test_set_user_token(self, mock_stdout: io.StringIO):
"""Test setting the user auth token."""
with unittest.mock.patch.object(self.thread._client,
"set_auth_token") as mock_set_auth:
self.thread.set_user_token("abcde")
self.assertFalse(self.thread.ready.is_set())
self.assertEqual(self.thread._task,
{"op": "set-token", "token": "abcde"})
self.assertEqual(mock_stdout.getvalue(),
"listenbrainz: setting user token\n")
self.thread.ready.wait()
mock_set_auth.assert_called_with("abcde")
self.assertEqual(self.thread.get_result(),
{"op": "set-token", "token": "abcde",
"valid": True, "offline": False})
def test_set_user_token_exceptions(self, mock_stdout: io.StringIO):
"""Test exception handling when setting the user auth token."""
with unittest.mock.patch.object(self.thread._client,
"set_auth_token") as mock_set_auth:
mock_set_auth.side_effect = \
liblistenbrainz.errors.InvalidAuthTokenException()
self.thread.set_user_token("abcde")
self.thread.ready.wait()
self.assertEqual(self.thread.get_result(),
{"op": "set-token", "token": "abcde",
"valid": False, "offline": False})
self.assertEqual(mock_stdout.getvalue(),
"listenbrainz: setting user token\n" +
"listenbrainz: user token is invalid\n")
mock_set_auth.side_effect = requests.exceptions.ConnectionError()
self.thread.set_user_token("abcde")
self.thread.ready.wait()
self.assertEqual(self.thread.get_result(),
{"op": "set-token", "token": "abcde",
"valid": True, "offline": True})
self.assertRegex(mock_stdout.getvalue(), "listenbrainz: offline")
def test_submit_now_playing(self, mock_stdout: io.StringIO):
"""Test submitting the now playing track."""
listen = liblistenbrainz.Listen("Track Name", "Artist Name")
with unittest.mock.patch.object(self.thread._client,
"submit_playing_now") as mock_submit:
self.thread.submit_now_playing(listen)
self.assertFalse(self.thread.ready.is_set())
self.assertEqual(self.thread._task, {"op": "now-playing",
"listen": listen})
self.assertEqual(mock_stdout.getvalue(),
"listenbrainz: now playing 'Track Name' " +
"by 'Artist Name'\n")
self.thread.ready.wait()
mock_submit.assert_called_with(listen)
self.assertEqual(self.thread.get_result(),
{"op": "now-playing", "valid": True,
"offline": False})
def test_submit_now_playing_exceptions(self, mock_stdout: io.StringIO):
"""Test exception handling when submitting the now playing track."""
listen = liblistenbrainz.Listen("Track Name", "Artist Name")
with unittest.mock.patch.object(self.thread._client,
"submit_playing_now") as mock_submit:
mock_submit.side_effect = \
liblistenbrainz.errors.ListenBrainzAPIException(401)
self.thread.submit_now_playing(listen)
self.thread.ready.wait()
self.assertEqual(self.thread.get_result(),
{"op": "now-playing", "valid": False,
"offline": False})
self.assertEqual(mock_stdout.getvalue(),
"listenbrainz: now playing 'Track Name' " +
"by 'Artist Name'\n" +
"listenbrainz: user token is invalid\n")
mock_submit.side_effect = requests.exceptions.ConnectionError()
self.thread.submit_now_playing(listen)
self.thread.ready.wait()
self.assertEqual(self.thread.get_result(),
{"op": "now-playing", "valid": True,
"offline": True})
self.assertRegex(mock_stdout.getvalue(), "listenbrainz: offline")
def test_submit_single_listen(self, mock_stdout: io.StringIO):
"""Test submitting a single listen."""
listens = [liblistenbrainz.Listen("Track Name", "Artist Name")]
with unittest.mock.patch.object(self.thread._client,
"submit_single_listen") as mock_submit:
self.thread.submit_listens(listens)
self.assertFalse(self.thread.ready.is_set())
self.assertEqual(self.thread._task, {"op": "submit-listens",
"listens": listens})
self.assertEqual(mock_stdout.getvalue(),
"listenbrainz: submitting 1 listen\n")
self.thread.ready.wait()
mock_submit.assert_called_with(listens[0])
self.assertEqual(self.thread.get_result(),
{"op": "submit-listens", "listens": listens,
"valid": True, "offline": False})
def test_submit_multiple_listens(self, mock_stdout: io.StringIO):
"""Test submitting multiple listens."""
listens = [liblistenbrainz.Listen("Track 1", "Artist"),
liblistenbrainz.Listen("Track 2", "Artist"),
liblistenbrainz.Listen("Track 3", "Artist")]
with unittest.mock.patch.object(self.thread._client,
"submit_multiple_listens") \
as mock_submit:
self.thread.submit_listens(listens)
self.assertFalse(self.thread.ready.is_set())
self.assertEqual(self.thread._task, {"op": "submit-listens",
"listens": listens})
self.assertEqual(mock_stdout.getvalue(),
"listenbrainz: submitting 3 listens\n")
self.thread.ready.wait()
mock_submit.assert_called_with(listens)
self.assertEqual(self.thread.get_result(),
{"op": "submit-listens", "listens": listens,
"valid": True, "offline": False})
def test_submit_listens_exceptions(self, mock_stdout: io.StringIO):
"""Test exception handling when submitting listens."""
listens = [liblistenbrainz.Listen("Track Name", "Artist Name")]
with unittest.mock.patch.object(self.thread._client,
"submit_single_listen") as mock_submit:
mock_submit.side_effect = \
liblistenbrainz.errors.ListenBrainzAPIException(401)
self.thread.submit_listens(listens)
self.thread.ready.wait()
self.assertEqual(self.thread.get_result(),
{"op": "submit-listens", "listens": listens,
"valid": False, "offline": False})
self.assertEqual(mock_stdout.getvalue(),
"listenbrainz: submitting 1 listen\n" +
"listenbrainz: user token is invalid\n")
mock_submit.side_effect = requests.exceptions.ConnectionError()
self.thread.submit_listens(listens)
self.thread.ready.wait()
self.assertEqual(self.thread.get_result(),
{"op": "submit-listens", "listens": listens,
"valid": True, "offline": True})
self.assertRegex(mock_stdout.getvalue(), "listenbrainz: offline")

View File

@ -115,7 +115,7 @@ class TestNowPlaying(unittest.TestCase):
self.assertEqual(self.card._favorite.get_next_sibling(),
self.card._jump)
self.assertEqual(self.card._jump.icon_name, "go-jump")
self.assertEqual(self.card._jump.icon_name, "arrow4-down-symbolic")
self.assertEqual(self.card._jump.get_tooltip_text(),
"scroll to current track")
self.assertEqual(self.card._jump.get_valign(), Gtk.Align.CENTER)

View File

@ -25,7 +25,7 @@ class TestLibraries(tests.util.TestCase):
emmental.sidebar.library.LibraryRow)
self.assertEqual(self.libraries.table, self.sql.libraries)
self.assertEqual(self.libraries.icon_name, "library-music")
self.assertEqual(self.libraries.icon_name, "library-music-symbolic")
self.assertEqual(self.libraries.title, "Library Paths")
def test_extra_widget(self):

View File

@ -54,7 +54,7 @@ class TestSidebar(tests.util.TestCase):
self.sidebar._jump)
self.assertEqual(self.sidebar._jump.get_icon_name(),
"go-jump-symbolic")
"arrow4-down-symbolic")
self.assertEqual(self.sidebar._jump.get_tooltip_text(),
"scroll to current playlist")
@ -66,25 +66,20 @@ class TestSidebar(tests.util.TestCase):
def test_sensitivity_and_startup(self):
"""Test setting the sidebar sensitivity when all tables have loaded."""
tables = [t for t in self.sql.playlist_tables()]
self.sidebar.select_playlist = unittest.mock.Mock()
self.sidebar._libraries.extra_widget.emit = unittest.mock.Mock()
for table in tables:
self.assertFalse(self.sidebar.get_sensitive())
self.sidebar.select_playlist.assert_not_called()
self.sidebar._libraries.extra_widget.emit.assert_not_called()
table.load(now=True)
self.assertEqual(self.sidebar.get_sensitive(),
table == tables[-1])
self.assertFalse(self.sidebar.get_sensitive())
self.sql.loaded = True
self.assertTrue(self.sidebar.get_sensitive())
playlist = self.sql.playlists.collection
self.sidebar.select_playlist.assert_called_with(playlist, 150)
self.sidebar._libraries.extra_widget.emit.assert_called_with("clicked")
self.sidebar.select_playlist.reset_mock()
self.sql.emit("table-loaded", tables[0])
self.sql.loaded = False
self.assertFalse(self.sidebar.get_sensitive())
self.sidebar.select_playlist.assert_not_called()
def test_show_all_artists(self):

View File

@ -49,12 +49,15 @@ class TestEmmental(unittest.TestCase):
self.assertIsNone(self.application.mpris)
self.assertIsNone(self.application.factory)
self.assertIsNone(self.application.player)
self.assertIsNone(self.application.lbrainz)
self.assertIsNone(self.application.win)
self.application.emit("startup")
self.assertIsInstance(self.application.db, emmental.db.Connection)
self.assertIsInstance(self.application.mpris,
emmental.mpris2.Connection)
self.assertIsInstance(self.application.lbrainz,
emmental.listenbrainz.ListenBrainz)
self.assertIsInstance(self.application.player, emmental.audio.Player)
self.assertIsInstance(self.application.factory,
emmental.playlist.Factory)
@ -84,12 +87,16 @@ class TestEmmental(unittest.TestCase):
"""Test that the shutdown signal works as expected."""
db = self.application.db = emmental.db.Connection()
mpris = self.application.mpris = emmental.mpris2.Connection()
lbrainz = self.application.lbrainz = \
emmental.listenbrainz.ListenBrainz(
self.application.db)
self.application.win = emmental.window.Window("Test 1.2.3")
player = self.application.player = emmental.audio.Player()
self.application.emit("shutdown")
self.assertIsNone(self.application.db)
self.assertIsNone(self.application.mpris)
self.assertIsNone(self.application.lbrainz)
self.assertIsNone(self.application.player)
self.assertIsNone(self.application.win)
@ -97,6 +104,7 @@ class TestEmmental(unittest.TestCase):
self.assertFalse(db.connected)
self.assertEqual(player.get_state(), gi.repository.Gst.State.NULL)
mock_close.assert_called()
self.assertFalse(lbrainz._thread.is_alive())
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
def test_window_widgets(self, mock_stdout: io.StringIO):
@ -105,6 +113,8 @@ class TestEmmental(unittest.TestCase):
self.application.factory = emmental.playlist.Factory(
self.application.db)
self.application.player = emmental.audio.Player()
self.application.lbrainz = emmental.listenbrainz.ListenBrainz(
self.application.db)
win = self.application.build_window()
self.assertIsInstance(win, emmental.window.Window)
@ -125,6 +135,8 @@ class TestEmmental(unittest.TestCase):
self.application.factory = emmental.playlist.Factory(
self.application.db)
self.application.player = emmental.audio.Player()
self.application.lbrainz = emmental.listenbrainz.ListenBrainz(
self.application.db)
self.application.build_window()
for action, accel in [("app.open-file", "<Control>o"),
@ -147,6 +159,8 @@ class TestEmmental(unittest.TestCase):
self.application.factory = emmental.playlist.Factory(
self.application.db)
self.application.player = emmental.audio.Player()
self.application.lbrainz = emmental.listenbrainz.ListenBrainz(
self.application.db)
win = self.application.build_window()
for action, accel in [("app.toggle-favorite", ["<Control>f"]),
@ -204,6 +218,8 @@ class TestEmmental(unittest.TestCase):
self.application.factory = emmental.playlist.Factory(
self.application.db)
self.application.player = emmental.audio.Player()
self.application.lbrainz = emmental.listenbrainz.ListenBrainz(
self.application.db)
win = self.application.build_window()
for action, accel in [("app.focus-search-playlist",
@ -226,6 +242,8 @@ class TestEmmental(unittest.TestCase):
self.application.factory = emmental.playlist.Factory(
self.application.db)
self.application.player = emmental.audio.Player()
self.application.lbrainz = emmental.listenbrainz.ListenBrainz(
self.application.db)
win = self.application.build_window()
for action, accel in [("app.focus-search-track", "<Control>slash"),
@ -250,6 +268,8 @@ class TestEmmental(unittest.TestCase):
"""Test that the Playlist Factory is wired up properly."""
self.application.db = emmental.db.Connection()
self.application.player = emmental.audio.Player()
self.application.lbrainz = emmental.listenbrainz.ListenBrainz(
self.application.db)
self.application.factory = emmental.playlist.Factory(
self.application.db)
self.application.win = self.application.build_window()
@ -264,6 +284,31 @@ class TestEmmental(unittest.TestCase):
self.assertEqual(self.application.factory.db_previous,
self.application.db.playlists.previous)
def test_listenbrainz(self):
"""Test that listenbrainz is wired up properly."""
self.application.db = emmental.db.Connection()
self.application.player = emmental.audio.Player()
self.application.lbrainz = emmental.listenbrainz.ListenBrainz(
self.application.db)
self.application.factory = emmental.playlist.Factory(
self.application.db)
self.application.win = self.application.build_window()
with unittest.mock.patch.object(self.application.lbrainz,
"submit_listens") as mock_submit:
self.application.connect_listenbrainz()
self.application.db.tracks.emit("track-played", None)
mock_submit.assert_called()
self.application.lbrainz.stop()
self.application.win.header.listenbrainz_token = "abcde"
self.assertEqual(self.application.lbrainz.user_token, "abcde")
self.application.lbrainz.valid_token = False
self.assertFalse(self.application.win.header.listenbrainz_token_valid)
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
def test_replaygain(self, mock_stdout: io.StringIO):
"""Test setting replaygain modes."""
@ -271,6 +316,8 @@ class TestEmmental(unittest.TestCase):
self.application.factory = emmental.playlist.Factory(
self.application.db)
self.application.player = emmental.audio.Player()
self.application.lbrainz = emmental.listenbrainz.ListenBrainz(
self.application.db)
win = self.application.build_window()
player = self.application.player
@ -286,6 +333,8 @@ class TestEmmental(unittest.TestCase):
self.application.factory = emmental.playlist.Factory(
self.application.db)
self.application.player = emmental.audio.Player()
self.application.lbrainz = emmental.listenbrainz.ListenBrainz(
self.application.db)
win = self.application.build_window()
player = self.application.player
@ -301,6 +350,8 @@ class TestEmmental(unittest.TestCase):
self.application.factory = emmental.playlist.Factory(
self.application.db)
self.application.player = emmental.audio.Player()
self.application.lbrainz = emmental.listenbrainz.ListenBrainz(
self.application.db)
win = self.application.build_window()
win.show_sidebar = True

View File

@ -73,6 +73,11 @@ class TestGSetup(unittest.TestCase):
data_path = xdg.BaseDirectory.save_data_path("emmental")
self.assertEqual(emmental.gsetup.DATA_DIR, pathlib.Path(data_path))
def test_has_icon(self):
"""Check that has_icon() works as expected."""
self.assertTrue(emmental.gsetup.has_icon("media-playback-start"))
self.assertFalse(emmental.gsetup.has_icon("no-such-icon"))
def test_env_string(self):
"""Check that the env_string() function works as expected."""
self.assertRegex(emmental.gsetup.env_string(),

View File

@ -21,11 +21,23 @@ class TestSettings(unittest.TestCase):
self.settings = self.app.db.settings
self.win = self.app.win
self.player = self.app.player
self.lbrainz = self.app.lbrainz
def tearDown(self):
"""Clean up."""
self.app.do_shutdown()
def test_save_listenbrainz_token(self, new_callable=io.StringIO):
"""Check saving and loading the listenbrainz token."""
self.assertEqual(self.settings["listenbrainz.token"], "")
self.assertEqual(self.win.header.listenbrainz_token, "")
self.win.header.listenbrainz_token = "abcde"
self.assertEqual(self.settings["listenbrainz.token"], "abcde")
win = self.app.build_window()
self.assertEqual(win.header.listenbrainz_token, "abcde")
def test_save_window_size(self, new_callable=io.StringIO):
"""Check saving and loading window size from the database."""
self.assertEqual(self.settings["window.width"], 1600)

View File

@ -12,6 +12,10 @@ from gi.repository import Gdk
class TestTextureCache(unittest.TestCase):
"""Test our custom cache dictionary."""
def setUpClass():
"""Clear the existing cache before testing."""
emmental.texture.CACHE.clear()
def setUp(self):
"""Set up common variables."""
cover = tests.util.COVER_JPG.absolute().relative_to("/")

118
tests/test_thread.py Normal file
View File

@ -0,0 +1,118 @@
# Copyright 2024 (c) Anna Schumaker.
"""Tests our common Thread class."""
import emmental.thread
import threading
import unittest
class TestData(unittest.TestCase):
"""Tests our thread Data class."""
def test_init_kwargs(self):
"""Tests initializing the data class with keyword args."""
data = emmental.thread.Data(a=1, b=2)
self.assertEqual(data.a, 1)
self.assertEqual(data.b, 2)
self.assertEqual(repr(data), "Data(a=1, b=2)")
def test_init_values_dict(self):
"""Test initializing the data class with a dictionary of values."""
data = emmental.thread.Data({"a": 1, "b": 2})
self.assertEqual(data.a, 1)
self.assertEqual(data.b, 2)
self.assertEqual(repr(data), "Data(a=1, b=2)")
def test_init_both(self):
"""Test initializing the data class with both."""
data = emmental.thread.Data({"a": 1, "b": 2}, b=3, c='4')
self.assertEqual(data.a, 1)
self.assertEqual(data.b, 3)
self.assertEqual(data.c, '4')
self.assertEqual(repr(data), "Data(a=1, b=3, c='4')")
def test_compare(self):
"""Test comparing two data classes."""
data1 = emmental.thread.Data({"a": 1, "b": 2})
data2 = emmental.thread.Data({"c": 3, "d": 4})
self.assertTrue(data1 == data1)
self.assertTrue(data1 == {"a": 1, "b": 2})
self.assertFalse(data1 == data2)
self.assertFalse(data1 == {"c": 2, "d": 4})
self.assertFalse(data1 == 3)
class TestThread(unittest.TestCase):
"""Tests our Thread class."""
def setUp(self):
"""Set up common variables."""
self.thread = emmental.thread.Thread()
def tearDown(self):
"""Clean up."""
self.thread.stop()
def test_init(self):
"""Check that the Thread was initialized properly."""
self.assertIsInstance(self.thread, threading.Thread)
self.assertIsInstance(self.thread.ready, threading.Event)
self.assertIsInstance(self.thread._condition, threading.Condition)
self.assertIsNone(self.thread._task)
self.assertIsNone(self.thread._result)
self.assertTrue(self.thread.is_alive())
self.assertTrue(self.thread.ready.is_set())
def test_set_get_result(self):
"""Test the set_result() and get_result() functions."""
with unittest.mock.patch.object(self.thread, "do_get_result",
wraps=self.thread.do_get_result) \
as mock_get_result:
self.assertIsNone(self.thread.get_result())
mock_get_result.assert_not_called()
self.thread.ready.clear()
self.thread._result = {"res": "abcde"}
self.assertIsNone(self.thread.get_result())
mock_get_result.assert_not_called()
self.thread.set_result(res="fghij")
self.assertTrue(self.thread.ready.is_set())
self.assertIsInstance(self.thread._result, emmental.thread.Data)
self.assertEqual(self.thread._result, {"res": "fghij"})
self.assertEqual(self.thread.get_result(), {"res": "fghij"})
self.assertIsNone(self.thread._result)
mock_get_result.assert_called_with({"res": "fghij"})
result = {"res1": "klmno", "res2": "pqrst"}
self.thread.set_result(**result)
self.assertEqual(self.thread.get_result(other="other", arg="arg"),
result)
mock_get_result.assert_called_with(result,
other="other", arg="arg")
def test_set_task(self):
"""Test the set_task() function."""
self.thread._result = "abcde"
with unittest.mock.patch.object(self.thread, "do_run_task",
wraps=self.thread.do_run_task) \
as mock_run_task:
self.thread.set_task(arg="test")
self.assertIsInstance(self.thread._task, emmental.thread.Data)
self.assertEqual(self.thread._task, {"arg": "test"})
self.assertIsNone(self.thread._result)
self.thread.ready.wait()
mock_run_task.assert_called_with(self.thread._task)
def test_stop(self):
"""Test stopping the Thread."""
self.thread._task = ("test", "task")
with unittest.mock.patch.object(self.thread, "do_stop") as mock_stop:
self.thread.stop()
self.assertFalse(self.thread.is_alive())
self.assertFalse(self.thread.ready.is_set())
self.assertIsNone(self.thread._task)
mock_stop.assert_called()

View File

@ -195,13 +195,50 @@ class TestShuffleButtons(unittest.TestCase):
"media-playlist-shuffle")
self.assertEqual(self.shuffle.active_tooltip_text, "shuffle: enabled")
self.assertEqual(self.shuffle.inactive_icon_name,
"media-playlist-consecutive")
self.assertEqual(self.shuffle.inactive_tooltip_text,
"shuffle: disabled")
self.assertAlmostEqual(self.shuffle.icon_opacity, 0.5, delta=0.005)
@unittest.mock.patch("emmental.gsetup.has_icon")
def test_get_inactive_icon(self, mock_has_icon: unittest.mock.Mock):
"""Test the get_inactive_icon() function."""
mock_has_icon.return_value = True
self.assertEqual(self.shuffle.get_inactive_icon(),
"media-playlist-normal")
mock_has_icon.assert_called()
mock_has_icon.return_value = False
self.assertEqual(self.shuffle.get_inactive_icon(),
"media-playlist-consecutive")
@unittest.mock.patch("emmental.gsetup.has_icon")
def test_inactive_icon_name(self, mock_has_icon: unittest.mock.Mock):
"""Test setting the inactive icon name."""
mock_has_icon.return_value = True
button = emmental.tracklist.buttons.ShuffleButton()
mock_has_icon.assert_called_with("media-playlist-normal")
self.assertEqual(button.inactive_icon_name, "media-playlist-normal")
mock_has_icon.return_value = False
button = emmental.tracklist.buttons.ShuffleButton()
self.assertEqual(button.inactive_icon_name,
"media-playlist-consecutive")
@unittest.mock.patch("emmental.gsetup.has_icon")
def test_toggled(self, mock_has_icon: unittest.mock.Mock):
"""Test changing the icon when toggled."""
mock_has_icon.return_value = True
self.shuffle.active = True
self.assertEqual(self.shuffle.inactive_icon_name,
"media-playlist-normal")
mock_has_icon.assert_called()
mock_has_icon.return_value = False
self.shuffle.active = False
self.assertEqual(self.shuffle.inactive_icon_name,
"media-playlist-consecutive")
def test_opacity(self):
"""Test adjusting the opacity based on active state."""
self.shuffle.active = True
@ -340,7 +377,7 @@ class TestSortButton(unittest.TestCase):
"""Test that the Sort button is configured correctly."""
self.assertIsInstance(self.sort, emmental.buttons.PopoverButton)
self.assertEqual(self.sort.get_icon_name(),
"view-list-ordered-symbolic")
"list-compact-symbolic")
self.assertEqual(self.sort.get_tooltip_text(),
"configure playlist sort order")
self.assertFalse(self.sort.get_has_frame())