emmental/emmental/db/media.py

112 lines
4.3 KiB
Python

# Copyright 2022 (c) Anna Schumaker.
"""A custom Gio.ListModel for managing individual media in an album."""
import sqlite3
from gi.repository import GObject
from .. import format
from . import playlist
from . import tracks
class Medium(playlist.Playlist):
"""Our custom Medium object representing a single disc in an album."""
mediumid = GObject.Property(type=int)
albumid = GObject.Property(type=int)
number = GObject.Property(type=int, default=1)
type = GObject.Property(type=str)
def get_album(self) -> playlist.Playlist:
"""Get this Medium's Album."""
return self.table.sql.albums.rows.get(self.albumid)
def rename(self, new_name: str) -> bool:
"""Rename this medium."""
return self.table.rename(self, new_name)
@property
def primary_key(self) -> int:
"""Get this Medium's primary key."""
return self.mediumid
@GObject.Property(type=playlist.Playlist)
def parent(self) -> playlist.Playlist | None:
"""Get this Medium's parent playlist."""
return self.get_album()
class Table(playlist.Table):
"""Our Media Table."""
def __init__(self, sql: GObject.TYPE_PYOBJECT, **kwargs):
"""Initialize the Media Table."""
super().__init__(sql=sql, autodelete=True,
system_tracks=False, **kwargs)
def do_construct(self, **kwargs) -> Medium:
"""Construct a new medium."""
return Medium(**kwargs)
def do_add_track(self, medium: Medium, track: tracks.Track) -> bool:
"""Verify adding a Track to the Medium playlist."""
return track.get_medium() == medium
def do_get_sort_key(self, medium: Medium) -> tuple[int, int, tuple, str]:
"""Get the sort key for a medium."""
return (medium.albumid, medium.number,
format.sort_key(medium.name), medium.type)
def do_remove_track(self, medium: Medium, track: tracks.Track) -> bool:
"""Verify removing a Track from the Medium playlist."""
return True
def do_sql_delete(self, medium: Medium) -> sqlite3.Cursor:
"""Delete a medium."""
return self.sql("DELETE FROM media WHERE mediumid=?",
medium.mediumid)
def do_sql_glob(self, glob: str) -> sqlite3.Cursor:
"""Search for media names matching the search text."""
return self.sql("""SELECT mediumid FROM media
WHERE CASEFOLD(name) GLOB ?""", glob)
def do_sql_insert(self, album: playlist.Playlist, name: str,
*, number: int, type: str = "") -> sqlite3.Cursor | None:
"""Create a new medium."""
if cur := self.sql("""INSERT INTO media (albumid, number, name, type)
VALUES (?, ?, ?, ?)""",
album.albumid, number, name, type):
return self.sql("SELECT * FROM media_view WHERE mediumid=?",
cur.lastrowid)
def do_sql_select_all(self) -> sqlite3.Cursor:
"""Load media from the database."""
return self.sql("SELECT * FROM media_view")
def do_sql_select_one(self, album: playlist.Playlist,
*, number: int, type: str = "") -> sqlite3.Cursor:
"""Look up a medium by album, number, and type."""
return self.sql("""SELECT mediumid FROM media
WHERE albumid=? AND number=? AND type=?""",
album.albumid, number, type)
def do_sql_select_trackids(self, medium: Medium) -> sqlite3.Cursor:
"""Load a Medium's Tracks from the database."""
return self.sql("""SELECT trackid FROM medium_tracks_view
WHERE mediumid=?""", medium.mediumid)
def do_sql_update(self, medium: Medium,
column: str, newval) -> sqlite3.Cursor:
"""Update a medium."""
return self.sql(f"UPDATE media SET {column}=? WHERE mediumid=?",
newval, medium.mediumid)
def rename(self, medium: Medium, new_name: str) -> bool:
"""Rename a medium."""
if (new_name := new_name.strip()) != medium.name:
if self.update(medium, "name", new_name):
self.store.remove(medium)
medium.name = new_name
self.store.append(medium)
return True
return False