136 lines
5.2 KiB
Python
136 lines
5.2 KiB
Python
# Copyright 2022 (c) Anna Schumaker.
|
|
"""A custom Gio.ListModel for managing individual media in an album."""
|
|
import sqlite3
|
|
from gi.repository import GObject
|
|
from gi.repository import Gtk
|
|
from .. import format
|
|
from . import playlist
|
|
from . import table
|
|
from . import tracks
|
|
|
|
|
|
class Medium(playlist.Playlist):
|
|
"""Our custom Medium object representing a single disc in an album."""
|
|
|
|
mediumid = GObject.Property(type=int)
|
|
albumid = GObject.Property(type=int)
|
|
number = GObject.Property(type=int, default=1)
|
|
type = GObject.Property(type=str)
|
|
|
|
def get_album(self) -> playlist.Playlist:
|
|
"""Get this Medium's Album."""
|
|
return self.table.sql.albums.rows.get(self.albumid)
|
|
|
|
def rename(self, new_name: str) -> bool:
|
|
"""Rename this medium."""
|
|
return self.table.rename(self, new_name)
|
|
|
|
@property
|
|
def primary_key(self) -> int:
|
|
"""Get this Medium's primary key."""
|
|
return self.mediumid
|
|
|
|
@GObject.Property(type=playlist.Playlist)
|
|
def parent(self) -> playlist.Playlist | None:
|
|
"""Get this Medium's parent playlist."""
|
|
return self.get_album()
|
|
|
|
|
|
class Filter(table.KeySet):
|
|
"""Custom filter to hide media with empty names."""
|
|
|
|
def do_get_strictness(self) -> Gtk.FilterMatch:
|
|
"""Get the strictness of the filter."""
|
|
if (res := super().do_get_strictness()) == Gtk.FilterMatch.ALL:
|
|
res = Gtk.FilterMatch.SOME
|
|
return res
|
|
|
|
def do_match(self, medium: Medium) -> bool:
|
|
"""Check if the Medium matches the filter."""
|
|
return len(medium.name) > 0 if super().do_match(medium) else False
|
|
|
|
|
|
class Table(playlist.Table):
|
|
"""Our Media Table."""
|
|
|
|
def __init__(self, sql: GObject.TYPE_PYOBJECT, **kwargs):
|
|
"""Initialize the Media Table."""
|
|
super().__init__(sql=sql, filter=Filter(), autodelete=True,
|
|
system_tracks=False, **kwargs)
|
|
|
|
def do_construct(self, **kwargs) -> Medium:
|
|
"""Construct a new medium."""
|
|
return Medium(**kwargs)
|
|
|
|
def do_add_track(self, medium: Medium, track: tracks.Track) -> bool:
|
|
"""Verify adding a Track to the Medium playlist."""
|
|
return track.get_medium() == medium
|
|
|
|
def do_get_sort_key(self, medium: Medium) -> tuple[int, int, tuple, str]:
|
|
"""Get the sort key for a medium."""
|
|
return (medium.albumid, medium.number,
|
|
format.sort_key(medium.name), medium.type)
|
|
|
|
def do_remove_track(self, medium: Medium, track: tracks.Track) -> bool:
|
|
"""Verify removing a Track from the Medium playlist."""
|
|
return True
|
|
|
|
def do_sql_delete(self, medium: Medium) -> sqlite3.Cursor:
|
|
"""Delete a medium."""
|
|
medium.get_album().remove_medium(medium)
|
|
return self.sql("DELETE FROM media WHERE mediumid=?",
|
|
medium.mediumid)
|
|
|
|
def do_sql_glob(self, glob: str) -> sqlite3.Cursor:
|
|
"""Search for media names matching the search text."""
|
|
return self.sql("""SELECT mediumid FROM media
|
|
WHERE CASEFOLD(name) GLOB ?""", glob)
|
|
|
|
def do_sql_insert(self, album: playlist.Playlist, name: str,
|
|
*, number: int, type: str = "") -> sqlite3.Cursor | None:
|
|
"""Create a new medium."""
|
|
if cur := self.sql("""INSERT INTO media (albumid, number, name, type)
|
|
VALUES (?, ?, ?, ?)""",
|
|
album.albumid, number, name, type):
|
|
return self.sql("SELECT * FROM media_view WHERE mediumid=?",
|
|
cur.lastrowid)
|
|
|
|
def do_sql_select_all(self) -> sqlite3.Cursor:
|
|
"""Load media from the database."""
|
|
return self.sql("SELECT * FROM media_view")
|
|
|
|
def do_sql_select_one(self, album: playlist.Playlist,
|
|
*, number: int, type: str = "") -> sqlite3.Cursor:
|
|
"""Look up a medium by album, number, and type."""
|
|
return self.sql("""SELECT mediumid FROM media
|
|
WHERE albumid=? AND number=? AND type=?""",
|
|
album.albumid, number, type)
|
|
|
|
def do_sql_select_trackids(self, medium: Medium) -> sqlite3.Cursor:
|
|
"""Load a Medium's Tracks from the database."""
|
|
return self.sql("""SELECT trackid FROM medium_tracks_view
|
|
WHERE mediumid=?""", medium.mediumid)
|
|
|
|
def do_sql_update(self, medium: Medium,
|
|
column: str, newval) -> sqlite3.Cursor:
|
|
"""Update a medium."""
|
|
return self.sql(f"UPDATE media SET {column}=? WHERE mediumid=?",
|
|
newval, medium.mediumid)
|
|
|
|
def create(self, album: playlist.Playlist,
|
|
*args, **kwargs) -> Medium | None:
|
|
"""Create a new Medium playlist."""
|
|
if (medium := super().create(album, *args, **kwargs)) is not None:
|
|
album.add_medium(medium)
|
|
return medium
|
|
|
|
def rename(self, medium: Medium, new_name: str) -> bool:
|
|
"""Rename a medium."""
|
|
if (new_name := new_name.strip()) != medium.name:
|
|
if self.update(medium, "name", new_name):
|
|
self.store.remove(medium)
|
|
medium.name = new_name
|
|
self.store.append(medium)
|
|
return True
|
|
return False
|