149 lines
5.7 KiB
Python
149 lines
5.7 KiB
Python
# Copyright 2022 (c) Anna Schumaker
|
|
"""A custom Gio.ListModel for working with artists."""
|
|
import sqlite3
|
|
from gi.repository import GObject
|
|
from gi.repository import Gtk
|
|
from .albums import Album
|
|
from .. import format
|
|
from . import playlist
|
|
from . import table
|
|
|
|
|
|
class Artist(playlist.Playlist):
|
|
"""Our custom Artist object."""
|
|
|
|
artistid = GObject.Property(type=int)
|
|
mbid = GObject.Property(type=str)
|
|
|
|
def __init__(self, **kwargs):
|
|
"""Initialize an Artist object."""
|
|
super().__init__(**kwargs)
|
|
self.add_children(self.table.sql.albums,
|
|
Gtk.CustomFilter.new(self.has_album),
|
|
self.table.get_albumids(self))
|
|
|
|
def add_album(self, album: Album) -> None:
|
|
"""Add an Album to this Artist."""
|
|
if self.table.add_album(self, album):
|
|
self.add_child(album)
|
|
|
|
def has_album(self, album: Album) -> bool:
|
|
"""Check if the Artist has this Album."""
|
|
return self.has_child(album)
|
|
|
|
def remove_album(self, album: Album) -> None:
|
|
"""Remove an album from this Artist."""
|
|
self.table.remove_album(self, album)
|
|
self.remove_child(album)
|
|
|
|
@property
|
|
def primary_key(self) -> int:
|
|
"""Get the Artist primary key."""
|
|
return self.artistid
|
|
|
|
|
|
class Filter(table.KeySet):
|
|
"""Custom filter to hide artists without albums."""
|
|
|
|
show_all = GObject.Property(type=bool, default=False)
|
|
|
|
def __init__(self, show_all: bool = False):
|
|
"""Initialize the Artist filter."""
|
|
super().__init__(show_all=show_all)
|
|
self.connect("notify::show-all", self.__notify_show_all)
|
|
|
|
def __notify_show_all(self, filter: table.KeySet, param) -> None:
|
|
self.changed(Gtk.FilterChange.LESS_STRICT if self.show_all else
|
|
Gtk.FilterChange.MORE_STRICT)
|
|
|
|
def do_get_strictness(self) -> Gtk.FilterMatch:
|
|
"""Get the strictness of the filter."""
|
|
res = super().do_get_strictness()
|
|
if not self.show_all and res == Gtk.FilterMatch.ALL:
|
|
return Gtk.FilterMatch.SOME
|
|
return res
|
|
|
|
def do_match(self, artist: Artist) -> bool:
|
|
"""Check if the artist matches the filter."""
|
|
res = super().do_match(artist)
|
|
if not self.show_all and res:
|
|
return artist.child_set.keyset.n_keys > 0
|
|
return res
|
|
|
|
|
|
class Table(playlist.Table):
|
|
"""Our Artist Table."""
|
|
|
|
show_all = GObject.Property(type=bool, default=False)
|
|
|
|
def __init__(self, sql: GObject.TYPE_PYOBJECT,
|
|
show_all: bool = False, **kwargs):
|
|
"""Initialize an Artist model."""
|
|
super().__init__(sql=sql, show_all=show_all, autodelete=True,
|
|
filter=Filter(show_all=show_all), **kwargs)
|
|
self.bind_property("show-all", self.get_filter(), "show-all")
|
|
|
|
def do_construct(self, **kwargs) -> Artist:
|
|
"""Construct a new artist."""
|
|
return Artist(**kwargs)
|
|
|
|
def do_get_sort_key(self, artist: Artist) -> tuple[tuple, bool, str]:
|
|
"""Get a sort key for the requested Playlist."""
|
|
return (format.sort_key(artist.name),
|
|
len(artist.mbid) == 0,
|
|
artist.mbid.casefold())
|
|
|
|
def do_sql_delete(self, artist: Artist) -> bool:
|
|
"""Delete an artist."""
|
|
return self.sql("DELETE FROM artists WHERE artistid=?",
|
|
artist.artistid)
|
|
|
|
def do_sql_glob(self, glob: str) -> sqlite3.Cursor:
|
|
"""Search for artists matching the search text."""
|
|
return self.sql("""SELECT artistid FROM album_artist_view
|
|
WHERE CASEFOLD(artist) GLOB :glob
|
|
OR CASEFOLD(album) GLOB :glob
|
|
OR CASEFOLD(medium) GLOB :glob""", glob=glob)
|
|
|
|
def do_sql_insert(self, name: str,
|
|
mbid: str = "") -> sqlite3.Cursor | None:
|
|
"""Create a new artist."""
|
|
if cur := self.sql("INSERT INTO artists (name, mbid) VALUES (?, ?)",
|
|
name, mbid):
|
|
return self.sql("SELECT * FROM artists_view WHERE artistid=?",
|
|
cur.lastrowid)
|
|
|
|
def do_sql_select_all(self) -> sqlite3.Cursor:
|
|
"""Load artists from the database."""
|
|
return self.sql("SELECT * FROM artists_view")
|
|
|
|
def do_sql_select_one(self, name: str | None = None,
|
|
*, mbid: str = "") -> sqlite3.Cursor:
|
|
"""Look up an artist by name and mbid."""
|
|
where = "mbid=? AND CASEFOLD(name)=?" if name else "mbid=?"
|
|
args = [mbid.lower(), name.casefold()] if name else [mbid.lower()]
|
|
return self.sql(f"SELECT artistid FROM artists WHERE {where}", *args)
|
|
|
|
def do_sql_update(self, artist: Artist,
|
|
column: str, newval) -> sqlite3.Cursor:
|
|
"""Update an artist."""
|
|
return self.sql(f"UPDATE artists SET {column}=? WHERE artistid=?",
|
|
newval, artist.artistid)
|
|
|
|
def add_album(self, artist: Artist, album: Album) -> bool:
|
|
"""Add an album to this artist."""
|
|
return self.sql("INSERT INTO album_artist_link VALUES (?, ?)",
|
|
artist.artistid, album.albumid) is not None
|
|
|
|
def get_albumids(self, artist: Artist) -> set[int]:
|
|
"""Get an Artist's associated albumids from the database."""
|
|
cur = self.sql("""SELECT albumid FROM album_artist_link
|
|
WHERE artistid=?""", artist.artistid)
|
|
return {row["albumid"] for row in cur.fetchall()}
|
|
|
|
def remove_album(self, artist: Artist, album: Album) -> bool:
|
|
"""Remove an album from this artist."""
|
|
return self.sql("""DELETE FROM album_artist_link
|
|
WHERE artistid=? AND albumid=?""",
|
|
artist.artistid, album.albumid).rowcount == 1
|