emmental/emmental/db/albums.py

163 lines
6.4 KiB
Python

# Copyright 2022 (c) Anna Schumaker
"""A custom Gio.ListModel for working with albums."""
import pathlib
import sqlite3
from gi.repository import GObject
from gi.repository import Gtk
from .media import Medium
from .. import format
from . import playlist
from . import tracks
class Album(playlist.Playlist):
"""Our custom Album with a ListModel representing mediums."""
albumid = GObject.Property(type=int)
artist = GObject.Property(type=str)
release = GObject.Property(type=str)
mbid = GObject.Property(type=str)
cover = GObject.Property(type=GObject.TYPE_PYOBJECT)
def __init__(self, **kwargs):
"""Initialize an Album object."""
super().__init__(**kwargs)
self.add_children(self.table.sql.media,
Gtk.CustomFilter.new(self.__match_medium),
self.table.get_mediumids(self))
def __match_medium(self, medium: Medium) -> bool:
return self.has_medium(medium) and len(medium.name) > 0
def add_medium(self, medium: Medium) -> None:
"""Add a Medium to this Album."""
self.add_child(medium)
def get_artists(self) -> list[playlist.Playlist]:
"""Get a list of artists for this album."""
return self.table.get_artists(self)
def get_media(self) -> list[Medium]:
"""Get a list of media for this album."""
return self.table.get_media(self)
def has_medium(self, medium: Medium) -> bool:
"""Check if a Medium is from this Album."""
return self.has_child(medium)
def remove_medium(self, medium: Medium) -> None:
"""Remove a Medium from this Album."""
return self.remove_child(medium)
@property
def primary_key(self) -> int:
"""Get the Album primary key."""
return self.albumid
@GObject.Property(type=playlist.Playlist)
def parent(self) -> playlist.Playlist | None:
"""Get the parent playlist of this Album."""
artists = self.get_artists()
return artists[0] if len(artists) else None
class Table(playlist.Table):
"""Our Album Table."""
def __init__(self, sql: GObject.TYPE_PYOBJECT, **kwargs):
"""Initialize the Album Table."""
super().__init__(sql=sql, autodelete=True,
system_tracks=False, **kwargs)
def do_add_track(self, album: Album, track: tracks.Track) -> bool:
"""Verify adding a Track to the Album playlist."""
return track.get_medium().get_album() == album
def do_construct(self, **kwargs) -> Album:
"""Construct a new album."""
return Album(**kwargs)
def do_get_sort_key(self, album: Album) -> tuple[tuple, bool,
str, tuple, str]:
"""Get a sort key for the requested Artist."""
return (format.sort_key(album.name),
len(album.mbid) == 0, album.mbid.casefold(),
format.sort_key(album.artist),
album.release)
def do_remove_track(self, album: Album, track: tracks.Track) -> bool:
"""Verify removing a Track from the Album playlist."""
return True
def do_sql_delete(self, album: Album) -> sqlite3.Cursor:
"""Delete an album."""
for artist in album.get_artists():
artist.remove_album(album)
for medium in album.get_media():
medium.delete()
return self.sql("DELETE FROM albums WHERE albumid=?", album.albumid)
def do_sql_glob(self, glob: str) -> sqlite3.Cursor:
"""Search for albums matching the search text."""
return self.sql("""SELECT albumid FROM album_artist_view
WHERE CASEFOLD(album) GLOB :glob
OR CASEFOLD(medium) GLOB :glob""", glob=glob)
def do_sql_insert(self, name: str, artist: str,
release: str, *, mbid: str = "",
cover: pathlib.Path = None) -> sqlite3.Cursor | None:
"""Create a new album."""
if cur := self.sql("""INSERT INTO albums
(name, artist, release, mbid, cover)
VALUES (?, ?, ?, ?, ?)""",
name, artist, release, mbid, cover):
return self.sql("SELECT * FROM albums_view WHERE albumid=?",
cur.lastrowid)
def do_sql_select_all(self) -> sqlite3.Cursor:
"""Load albums from the database."""
return self.sql("SELECT * FROM albums_view")
def do_sql_select_one(self, name: str = None,
artist: str = None, release: str = None,
*, mbid: str = "") -> sqlite3.Cursor:
"""Look up an albums by name, mbid, artist, and release."""
where = ["mbid=?"]
args = [mbid.lower()]
if None not in (name, artist, release):
where.extend(["CASEFOLD(name)=?",
"CASEFOLD(artist)=?", "release=?"])
args.extend([name.casefold(), artist.casefold(), release])
return self.sql(f"""SELECT albumid FROM albums
WHERE {" AND ".join(where)}""", *args)
def do_sql_select_trackids(self, album: Album) -> sqlite3.Cursor:
"""Load an Album's Tracks from the database."""
return self.sql("""SELECT trackid FROM album_tracks_view
WHERE albumid=?""", album.albumid)
def do_sql_update(self, album: Album, column: str, newval) -> bool:
"""Rename an album."""
return self.sql(f"UPDATE albums SET {column}=? WHERE albumid=?",
newval, album.albumid)
def get_artists(self, album: Album) -> list[playlist.Playlist]:
"""Get the list of artists for this album."""
rows = self.sql("""SELECT artistid FROM album_artist_link
WHERE albumid=?""", album.albumid).fetchall()
artists = [self.sql.artists.rows.get(row["artistid"]) for row in rows]
return list(filter(None, artists))
def get_media(self, album: Album) -> list[Medium]:
"""Get the list of media for this album."""
return [self.sql.media.rows.get(id)
for id in self.get_mediumids(album)]
def get_mediumids(self, album: Album) -> set[int]:
"""Get the set of mediumids for this album."""
rows = self.sql("SELECT mediumid FROM media WHERE albumid=?",
album.albumid)
return {row["mediumid"] for row in rows.fetchall()}