231 lines
9.8 KiB
Python
231 lines
9.8 KiB
Python
# Copyright 2022 (c) Anna Schumaker
|
|
"""A custom Gio.ListModel for working with playlists."""
|
|
import sqlite3
|
|
from gi.repository import GObject
|
|
from . import playlist
|
|
from . import tracks
|
|
|
|
|
|
class Playlist(playlist.Playlist):
|
|
"""Our custom Playlist with an image filepath."""
|
|
|
|
playlistid = GObject.Property(type=int)
|
|
image = GObject.Property(type=GObject.TYPE_PYOBJECT)
|
|
|
|
def do_update(self, column: str) -> None:
|
|
"""Update a playlist object."""
|
|
match (self.name, column, self.get_property(column)):
|
|
case ("Collection", "loop", "None"):
|
|
self.loop = "Playlist"
|
|
case ("Collection", "n-tracks", 0):
|
|
self.table.have_collection_tracks = False
|
|
case ("Collection", "n-tracks", _):
|
|
self.table.have_collection_tracks = True
|
|
case ("Previous Tracks", "loop", "Playlist") | \
|
|
("Previous Tracks", "loop", "Track"):
|
|
self.loop = "None"
|
|
case ("Previous Tracks", "shuffle", True):
|
|
self.shuffle = False
|
|
case ("Previous Tracks", "sort-order", _):
|
|
if self.sort_order != "laststarted DESC":
|
|
self.sort_order = "laststarted DESC"
|
|
case (_, _, _): super().do_update(column)
|
|
|
|
def rename(self, new_name: str) -> bool:
|
|
"""Rename this playlist."""
|
|
return self.table.rename(self, new_name)
|
|
|
|
@property
|
|
def primary_key(self) -> int:
|
|
"""Get the playlist primary key."""
|
|
return self.playlistid
|
|
|
|
|
|
class Table(playlist.Table):
|
|
"""Our Playlist Table."""
|
|
|
|
collection = GObject.Property(type=Playlist)
|
|
favorites = GObject.Property(type=Playlist)
|
|
most_played = GObject.Property(type=Playlist)
|
|
new_tracks = GObject.Property(type=Playlist)
|
|
previous = GObject.Property(type=Playlist)
|
|
queued = GObject.Property(type=Playlist)
|
|
unplayed = GObject.Property(type=Playlist)
|
|
|
|
have_collection_tracks = GObject.Property(type=bool, default=False)
|
|
|
|
def __init__(self, sql: GObject.TYPE_PYOBJECT, **kwargs):
|
|
"""Initialize the Playlists Table."""
|
|
super().__init__(sql=sql, system_tracks=False, **kwargs)
|
|
|
|
def __move_user_trackid(self, playlist: Playlist, trackid: int,
|
|
*, offset: int) -> bool:
|
|
order = self.get_track_order(playlist)
|
|
tracks = sorted(playlist.tracks.trackids, key=order.get)
|
|
start = tracks.index(trackid)
|
|
|
|
new = start + offset
|
|
if not (0 <= new < len(tracks)):
|
|
return False
|
|
|
|
tracks[start] = tracks[new]
|
|
tracks[new] = trackid
|
|
|
|
# Note: We write out all trackids so we don't have to update during
|
|
# do_add_track() and do_remove_track()
|
|
args = [(i, playlist.propertyid, t) for (i, t) in enumerate(tracks)]
|
|
self.sql.executemany("""UPDATE user_tracks SET position=?
|
|
WHERE propertyid=? AND trackid=?""", *args)
|
|
return True
|
|
|
|
def do_construct(self, **kwargs) -> Playlist:
|
|
"""Construct a new playlist."""
|
|
match (plist := Playlist(**kwargs)).name:
|
|
case "Collection": self.collection = plist
|
|
case "Favorite Tracks":
|
|
self.favorites = plist
|
|
self.favorites.user_tracks = True
|
|
case "Most Played Tracks": self.most_played = plist
|
|
case "New Tracks": self.new_tracks = plist
|
|
case "Previous Tracks":
|
|
self.previous = plist
|
|
self.sql("DELETE FROM system_tracks WHERE propertyid=?",
|
|
self.previous.propertyid)
|
|
case "Queued Tracks":
|
|
self.queued = plist
|
|
self.queued.user_tracks = True
|
|
self.queued.tracks_movable = True
|
|
case "Unplayed Tracks": self.unplayed = plist
|
|
case _:
|
|
plist.user_tracks = True
|
|
plist.tracks_movable = True
|
|
return plist
|
|
|
|
def do_add_track(self, playlist: Playlist, track: tracks.Track) -> bool:
|
|
"""Add a Track to the requested Playlist."""
|
|
match playlist:
|
|
case self.collection: return track.get_library().enabled
|
|
case self.most_played: view = "most_played_view"
|
|
case self.new_tracks: view = "new_tracks_view"
|
|
case self.favorites:
|
|
track.update_properties(favorite=True)
|
|
return True
|
|
case self.previous:
|
|
self.add_system_track(playlist, track)
|
|
return True
|
|
case self.queued:
|
|
self.sql.set_active_playlist(playlist)
|
|
return self.add_user_track(playlist, track)
|
|
case self.unplayed: return track.playcount == 0
|
|
case _: return self.add_user_track(playlist, track)
|
|
|
|
return self.sql(f"SELECT ? IN {view}", track.trackid).fetchone()[0]
|
|
|
|
def do_get_user_track_order(self, playlist: Playlist) -> dict[int, int]:
|
|
"""Get the user-configured sort order for a playlist."""
|
|
cur = self.sql("""SELECT trackid FROM user_tracks WHERE propertyid=?
|
|
ORDER BY position NULLS LAST, rowid""",
|
|
playlist.propertyid)
|
|
return {row["trackid"]: i for (i, row) in enumerate(cur.fetchall())}
|
|
|
|
def do_move_track_down(self, playlist: Playlist,
|
|
track: tracks.Track) -> bool:
|
|
"""Move a track down in the user sort order."""
|
|
return self.__move_user_trackid(playlist, track.trackid, offset=1)
|
|
|
|
def do_move_track_up(self, playlist: Playlist,
|
|
track: tracks.Track) -> bool:
|
|
"""Move a track up in the user sort order."""
|
|
return self.__move_user_trackid(playlist, track.trackid, offset=-1)
|
|
|
|
def do_remove_track(self, playlist: Playlist, track: tracks.Track) -> bool:
|
|
"""Remove a Track from the requested Playlist."""
|
|
match playlist:
|
|
case self.collection: return True
|
|
case self.most_played: return True
|
|
case self.new_tracks: return True
|
|
case self.unplayed: return True
|
|
case self.favorites:
|
|
track.update_properties(favorite=False)
|
|
return True
|
|
case self.previous:
|
|
return self.remove_system_track(playlist, track)
|
|
case _: return self.remove_user_track(playlist, track)
|
|
|
|
def do_sql_delete(self, playlist: Playlist) -> sqlite3.Cursor:
|
|
"""Delete a playlist."""
|
|
return self.sql("DELETE FROM playlists WHERE playlistid=?",
|
|
playlist.playlistid)
|
|
|
|
def do_sql_glob(self, glob: str) -> sqlite3.Cursor:
|
|
"""Search for playlists matching the search text."""
|
|
return self.sql("""SELECT playlistid FROM playlists
|
|
WHERE CASEFOLD(name) GLOB ?""", glob)
|
|
|
|
def do_sql_insert(self, name: str, **kwargs) -> sqlite3.Cursor | None:
|
|
"""Insert a new playlist into the database."""
|
|
if (cur := self.sql("INSERT INTO playlists (name) VALUES (?)", name)):
|
|
return self.sql("SELECT * FROM playlists_view WHERE playlistid=?",
|
|
cur.lastrowid)
|
|
|
|
def do_sql_select_all(self) -> sqlite3.Cursor:
|
|
"""Load playlists from the database."""
|
|
return self.sql("SELECT * FROM playlists_view")
|
|
|
|
def do_sql_select_trackids(self, playlist: Playlist) -> sqlite3.Cursor:
|
|
"""Load Tracks from the database."""
|
|
match playlist:
|
|
case self.collection: view = "collection_view"
|
|
case self.favorites: view = "favorite_view"
|
|
case self.most_played: view = "most_played_view"
|
|
case self.new_tracks: view = "new_tracks_view"
|
|
case self.unplayed: view = "unplayed_tracks_view"
|
|
case self.previous: return self.get_sql_system_trackids(playlist)
|
|
case _: return self.get_sql_user_trackids(playlist)
|
|
|
|
return self.sql(f"SELECT trackid FROM {view}")
|
|
|
|
def do_sql_select_one(self, name: str) -> sqlite3.Cursor:
|
|
"""Look up a playlist by name."""
|
|
return self.sql("SELECT playlistid FROM playlists WHERE name=?", name)
|
|
|
|
def do_sql_update(self, playlist: Playlist,
|
|
column: str, newval) -> sqlite3.Cursor:
|
|
"""Update a playlist."""
|
|
return self.sql(f"UPDATE playlists SET {column}=? WHERE playlistid=?",
|
|
newval, playlist.playlistid)
|
|
|
|
def add_user_track(self, playlist: Playlist, track: tracks.Track) -> bool:
|
|
"""Add a Track to the User Tracks table."""
|
|
cur = self.sql("""INSERT INTO user_tracks (propertyid, trackid)
|
|
VALUES (?, ?)""", playlist.propertyid, track.trackid)
|
|
return cur and cur.rowcount == 1
|
|
|
|
def get_sql_user_trackids(self, playlist: Playlist) -> sqlite3.Cursor:
|
|
"""Load user Tracks from the database."""
|
|
return self.sql("""SELECT trackid FROM user_tracks_view
|
|
WHERE propertyid=?""", playlist.propertyid)
|
|
|
|
def create(self, name: str) -> Playlist:
|
|
"""Create a new Playlist."""
|
|
if len(name := name.strip()) > 0:
|
|
return super().create(name)
|
|
|
|
def remove_user_track(self, playlist: Playlist,
|
|
track: tracks.Track) -> bool:
|
|
"""Remove a track from the User Tracks table."""
|
|
return self.sql("""DELETE FROM user_tracks
|
|
WHERE propertyid=? AND trackid=?""",
|
|
playlist.propertyid, track.trackid).rowcount == 1
|
|
|
|
def rename(self, playlist: Playlist, new_name: str) -> bool:
|
|
"""Rename a Playlist."""
|
|
if len(new_name := new_name.strip()) > 0:
|
|
if playlist.name != new_name:
|
|
if self.update(playlist, "name", new_name):
|
|
self.store.remove(playlist)
|
|
playlist.name = new_name
|
|
self.store.append(playlist)
|
|
return True
|
|
return False
|