310 lines
12 KiB
Python
310 lines
12 KiB
Python
# Copyright 2022 (c) Anna Schumaker
|
|
"""A customized Gio.ListStore for tracking Playlist GObjects."""
|
|
import sqlite3
|
|
import typing
|
|
from gi.repository import GObject
|
|
from gi.repository import Gio
|
|
from gi.repository import Gtk
|
|
from .tracks import Track, TrackidSet
|
|
from .. import format
|
|
from . import table
|
|
|
|
|
|
class Playlist(table.Row):
|
|
"""Our shared Playlist Row object."""
|
|
|
|
propertyid = GObject.Property(type=int)
|
|
|
|
name = GObject.Property(type=str)
|
|
active = GObject.Property(type=bool, default=False)
|
|
|
|
loop = GObject.Property(type=str, default="None")
|
|
shuffle = GObject.Property(type=bool, default=False)
|
|
sort_order = GObject.Property(type=str)
|
|
|
|
tracks = GObject.Property(type=TrackidSet)
|
|
n_tracks = GObject.Property(type=int)
|
|
user_tracks = GObject.Property(type=bool, default=False)
|
|
tracks_loaded = GObject.Property(type=bool, default=False)
|
|
tracks_movable = GObject.Property(type=bool, default=False)
|
|
current_trackid = GObject.Property(type=int)
|
|
|
|
child_set = GObject.Property(type=table.TableSubset)
|
|
children = GObject.Property(type=Gtk.FilterListModel)
|
|
|
|
def __init__(self, table: Gio.ListModel, propertyid: int,
|
|
name: str, current_trackid: int | None = 0, **kwargs):
|
|
"""Initialize a Playlist object."""
|
|
current_trackid = 0 if current_trackid is None else current_trackid
|
|
super().__init__(table=table, propertyid=propertyid, name=name,
|
|
current_trackid=current_trackid,
|
|
tracks=TrackidSet(), **kwargs)
|
|
self.tracks.bind_property("n-trackids", self, "n-tracks")
|
|
|
|
def __add_track(self, track: Track) -> bool:
|
|
self.tracks.add_track(track)
|
|
return True
|
|
|
|
def __remove_track(self, track: Track) -> bool:
|
|
self.tracks.remove_track(track)
|
|
self.table.remove_track(self, track)
|
|
return True
|
|
|
|
def add_children(self, child_table: table.Table, child_keys: set) -> None:
|
|
"""Create a FilterListModel for this playlist's children."""
|
|
self.child_set = table.TableSubset(child_table, keys=child_keys)
|
|
self.children = Gtk.FilterListModel.new(self.child_set,
|
|
child_table.get_filter())
|
|
|
|
def do_update(self, column: str) -> bool:
|
|
"""Update a Playlist object."""
|
|
match column:
|
|
case "propertyid" | "name" | "n-tracks" | "child-set" | \
|
|
"children" | "user-tracks" | "tracks-loaded" | \
|
|
"tracks-movable": pass
|
|
case _: return super().do_update(column)
|
|
return True
|
|
|
|
def add_child(self, child: typing.Self) -> None:
|
|
"""Add a child Playlist to this Playlist."""
|
|
self.child_set.add_row(child)
|
|
if self.child_set.keyset.n_keys == 1:
|
|
self.table.refilter(Gtk.FilterChange.LESS_STRICT)
|
|
|
|
def add_track(self, track: Track, *, idle: bool = False) -> None:
|
|
"""Add a Track to this Playlist."""
|
|
if self.table.add_track(self, track):
|
|
self.table.queue.push(self.__add_track, track, now=not idle)
|
|
|
|
def get_track_order(self) -> dict[int, int]:
|
|
"""Get a dictionary mapping for trackid -> sorted position."""
|
|
return self.table.get_track_order(self)
|
|
|
|
def has_child(self, child: typing.Self) -> bool:
|
|
"""Check if this Playlist has a specific child Playlist."""
|
|
return child in self.child_set
|
|
|
|
def has_track(self, track: Track) -> bool:
|
|
"""Check if a Track is on this Playlist."""
|
|
return track in self.tracks
|
|
|
|
def load_tracks(self) -> bool:
|
|
"""Load this Playlist's Tracks (if they haven't been loaded yet)."""
|
|
if not self.tracks_loaded:
|
|
self.tracks.trackids = self.table.get_trackids(self)
|
|
self.tracks_loaded = True
|
|
return True
|
|
|
|
def move_track_down(self, track: Track) -> bool:
|
|
"""Move a track down in the sort order."""
|
|
return self.table.move_track_down(self, track)
|
|
|
|
def move_track_up(self, track: Track) -> bool:
|
|
"""Move a track up in the sort order."""
|
|
return self.table.move_track_up(self, track)
|
|
|
|
def reload_tracks(self, *, idle: bool = False) -> None:
|
|
"""Load this Playlist's Tracks."""
|
|
self.tracks_loaded = False
|
|
self.table.queue.push(self.load_tracks, now=not idle)
|
|
|
|
def remove_child(self, child: typing.Self) -> None:
|
|
"""Remove a child Playlist from this Playlist."""
|
|
self.child_set.remove_row(child)
|
|
if self.child_set.keyset.n_keys == 0:
|
|
self.table.refilter(Gtk.FilterChange.MORE_STRICT)
|
|
|
|
def remove_track(self, track: table.Row, *, idle: bool = False) -> None:
|
|
"""Remove a Track from this Playlist."""
|
|
self.table.queue.push(self.__remove_track, track, now=not idle)
|
|
|
|
def rename(self, new_name: str) -> bool:
|
|
"""Rename this playlist."""
|
|
return self.table.rename(self, new_name)
|
|
|
|
@GObject.Property(type=table.Row)
|
|
def parent(self) -> table.Row | None:
|
|
"""Get this playlist's parent playlist."""
|
|
return None
|
|
|
|
|
|
class Table(table.Table):
|
|
"""A table.Table with extra functionality for Playlists."""
|
|
|
|
active_playlist = GObject.Property(type=Playlist)
|
|
treemodel = GObject.Property(type=Gtk.TreeListModel)
|
|
|
|
autodelete = GObject.Property(type=bool, default=False)
|
|
system_tracks = GObject.Property(type=bool, default=True)
|
|
|
|
def __init__(self, sql: GObject.TYPE_PYOBJECT, **kwargs):
|
|
"""Initialize a Playlist Table."""
|
|
super().__init__(sql=sql, **kwargs)
|
|
self.treemodel = Gtk.TreeListModel.new(root=self,
|
|
passthrough=False,
|
|
autoexpand=False,
|
|
create_func=self.__create_tree)
|
|
|
|
def __do_autodelete(self, plist: Playlist) -> bool:
|
|
if plist.n_tracks == 0:
|
|
self.delete(plist)
|
|
return True
|
|
|
|
def __autodelete(self, plist: Playlist):
|
|
if self.autodelete:
|
|
self.queue.push(self.__do_autodelete, plist)
|
|
|
|
def __create_tree(self, plist: Playlist) -> Gtk.FilterListModel | None:
|
|
return plist.children
|
|
|
|
def __refilter(self, change_how: Gtk.FilterChange) -> bool:
|
|
self.get_filter().changed(change_how)
|
|
return True
|
|
|
|
def do_add_track(self, playlist: Playlist, track: Track) -> bool:
|
|
"""Add a Track to the Playlist."""
|
|
raise NotImplementedError
|
|
|
|
def do_get_sort_key(self, playlist: Playlist) -> tuple[str]:
|
|
"""Get a sort key for the requested Playlist."""
|
|
return format.sort_key(playlist.name)
|
|
|
|
def do_get_user_track_order(self, playlist: Playlist) -> dict[int, int]:
|
|
"""Get a mapping of sort keys for the tracks in this Playlist."""
|
|
raise NotImplementedError
|
|
|
|
def do_move_track_down(self, playlist: Playlist, track: Track) -> bool:
|
|
"""Move a track down in the sort order."""
|
|
raise NotImplementedError
|
|
|
|
def do_move_track_up(self, playlist: Playlist, track: Track) -> bool:
|
|
"""Move a track up in the sort order."""
|
|
raise NotImplementedError
|
|
|
|
def do_remove_track(self, playlist: Playlist, track: Track) -> bool:
|
|
"""Remove a Track from the Playlist."""
|
|
raise NotImplementedError
|
|
|
|
def do_sql_select_trackids(self, playlist: Playlist) -> sqlite3.Cursor:
|
|
"""Select the trackids that are in this Playlist."""
|
|
raise NotImplementedError
|
|
|
|
def add_system_track(self, playlist: Playlist, track: Track) -> bool:
|
|
"""Add a Track to a system Playlist."""
|
|
cur = self.sql("""INSERT INTO system_tracks (propertyid, trackid)
|
|
VALUES (?, ?)""", playlist.propertyid, track.trackid)
|
|
return cur and cur.rowcount == 1
|
|
|
|
def add_track(self, playlist: Playlist, track: Track) -> bool:
|
|
"""Add a Track to a Playlist."""
|
|
if track is None or track.get_library().deleting:
|
|
return False
|
|
if self.system_tracks:
|
|
return self.add_system_track(playlist, track)
|
|
return self.do_add_track(playlist, track)
|
|
|
|
def clear(self) -> None:
|
|
"""Clear the Table."""
|
|
self.active_playlist = None
|
|
super().clear()
|
|
|
|
def construct(self, propertyid: int, name: str, **kwargs) -> Playlist:
|
|
"""Construct a new Playlist object."""
|
|
res = super().construct(propertyid=propertyid, name=name, **kwargs)
|
|
if res.active:
|
|
self.sql.set_active_playlist(res)
|
|
res.reload_tracks(idle=True)
|
|
return res
|
|
|
|
def delete(self, playlist: Playlist) -> bool:
|
|
"""Delete a playlist from the database."""
|
|
if playlist.active:
|
|
self.sql.set_active_playlist(None)
|
|
return super().delete(playlist)
|
|
|
|
def get_sql_system_trackids(self, playlist: Playlist) -> sqlite3.Cursor:
|
|
"""Load a System Playlist's Tracks from the database."""
|
|
return self.sql("""SELECT trackid FROM system_tracks_view
|
|
WHERE propertyid=?""", playlist.propertyid)
|
|
|
|
def get_trackids(self, playlist: Playlist) -> set[int]:
|
|
"""Load a Playlist's Tracks from the database."""
|
|
if self.system_tracks:
|
|
cur = self.get_sql_system_trackids(playlist)
|
|
else:
|
|
cur = self.do_sql_select_trackids(playlist)
|
|
|
|
res = {row["trackid"] for row in cur.fetchall()}
|
|
self.__autodelete(playlist)
|
|
return res
|
|
|
|
def get_track_order(self, playlist: Playlist) -> dict[int, int]:
|
|
"""Get the track sort order for a playlist."""
|
|
if playlist.tracks_movable and playlist.sort_order == "user":
|
|
return self.do_get_user_track_order(playlist)
|
|
return self.sql.tracks.map_sort_order(playlist.sort_order)
|
|
|
|
def move_track_down(self, playlist: Playlist, track: Track) -> bool:
|
|
"""Move a track down in the playlist."""
|
|
if not playlist.tracks_movable:
|
|
return False
|
|
if res := self.do_move_track_down(playlist, track):
|
|
if playlist.sort_order != "user":
|
|
playlist.sort_order = "user"
|
|
return res
|
|
|
|
def move_track_up(self, playlist: Playlist, track: Track) -> bool:
|
|
"""Move a track up in the playlist."""
|
|
if not playlist.tracks_movable:
|
|
return False
|
|
if res := self.do_move_track_up(playlist, track):
|
|
if playlist.sort_order != "user":
|
|
playlist.sort_order = "user"
|
|
return res
|
|
|
|
def refilter(self, change_how: Gtk.FilterChange) -> None:
|
|
"""Schedule refiltering the Table."""
|
|
self.queue.cancel_task(self.__refilter)
|
|
self.queue.push(self.__refilter, change_how, first=True)
|
|
|
|
def remove_system_track(self, playlist: Playlist, track: Track) -> bool:
|
|
"""Remove a Track from a system Playlist."""
|
|
return self.sql("""DELETE FROM system_tracks
|
|
WHERE propertyid=? AND trackid=?""",
|
|
playlist.propertyid, track.trackid).rowcount == 1
|
|
|
|
def remove_track(self, playlist: Playlist, track: Track) -> bool:
|
|
"""Remove a Track from a Playlist."""
|
|
if self.system_tracks:
|
|
res = self.remove_system_track(playlist, track)
|
|
else:
|
|
res = self.do_remove_track(playlist, track)
|
|
|
|
self.__autodelete(playlist)
|
|
return res
|
|
|
|
def update(self, playlist: Playlist, column: str, newval) -> bool:
|
|
"""Update a Playlist in the Database."""
|
|
match column:
|
|
case "active" | "loop" | "shuffle" | \
|
|
"sort-order" | "current-trackid":
|
|
return self.update_playlist_property(playlist, column, newval)
|
|
case _:
|
|
return super().update(playlist, column, newval)
|
|
|
|
def update_playlist_property(self, playlist: Playlist,
|
|
column: str, newval) -> bool:
|
|
"""Update the playlists_common table."""
|
|
match column:
|
|
case "active":
|
|
self.active_playlist = playlist if playlist.active else None
|
|
case "current-trackid":
|
|
column = "current_trackid"
|
|
newval = None if newval == 0 else newval
|
|
case "sort-order":
|
|
column = "sort_order"
|
|
|
|
return self.sql(f"""UPDATE playlist_properties
|
|
SET {column}=? WHERE propertyid=?""",
|
|
newval, playlist.propertyid) is not None
|