emmental/emmental/db/playlists.py

238 lines
10 KiB
Python

# Copyright 2022 (c) Anna Schumaker
"""A custom Gio.ListModel for working with playlists."""
import datetime
import sqlite3
from gi.repository import GObject
from .. import alarm
from . import playlist
from . import tracks
class Playlist(playlist.Playlist):
"""Our custom Playlist with an image filepath."""
playlistid = GObject.Property(type=int)
image = GObject.Property(type=GObject.TYPE_PYOBJECT)
def do_update(self, column: str) -> None:
"""Update a playlist object."""
match (self.name, column, self.get_property(column)):
case ("Collection", "loop", "None"):
self.loop = "Playlist"
case ("Collection", "n-tracks", 0):
self.table.have_collection_tracks = False
case ("Collection", "n-tracks", _):
self.table.have_collection_tracks = True
case ("Previous Tracks", "loop", "Playlist") | \
("Previous Tracks", "loop", "Track"):
self.loop = "None"
case ("Previous Tracks", "shuffle", True):
self.shuffle = False
case ("Previous Tracks", "sort-order", _):
if self.sort_order != "laststarted DESC":
self.sort_order = "laststarted DESC"
case (_, _, _): super().do_update(column)
def rename(self, new_name: str) -> bool:
"""Rename this playlist."""
return self.table.rename(self, new_name)
@property
def primary_key(self) -> int:
"""Get the playlist primary key."""
return self.playlistid
class Table(playlist.Table):
"""Our Playlist Table."""
collection = GObject.Property(type=Playlist)
favorites = GObject.Property(type=Playlist)
most_played = GObject.Property(type=Playlist)
new_tracks = GObject.Property(type=Playlist)
previous = GObject.Property(type=Playlist)
queued = GObject.Property(type=Playlist)
unplayed = GObject.Property(type=Playlist)
have_collection_tracks = GObject.Property(type=bool, default=False)
def __init__(self, sql: GObject.TYPE_PYOBJECT, **kwargs):
"""Initialize the Playlists Table."""
super().__init__(sql=sql, system_tracks=False, **kwargs)
alarm.set_alarm(datetime.time(hour=0, minute=0, second=5),
self.__at_midnight)
def __at_midnight(self) -> None:
self.new_tracks.reload_tracks()
def __move_user_trackid(self, playlist: Playlist, trackid: int,
*, offset: int) -> bool:
order = self.get_track_order(playlist)
tracks = sorted(playlist.tracks.trackids, key=order.get)
start = tracks.index(trackid)
new = start + offset
if not (0 <= new < len(tracks)):
return False
tracks[start] = tracks[new]
tracks[new] = trackid
# Note: We write out all trackids so we don't have to update during
# do_add_track() and do_remove_track()
args = [(i, playlist.propertyid, t) for (i, t) in enumerate(tracks)]
self.sql.executemany("""UPDATE user_tracks SET position=?
WHERE propertyid=? AND trackid=?""", *args)
return True
def do_construct(self, **kwargs) -> Playlist:
"""Construct a new playlist."""
match (plist := Playlist(**kwargs)).name:
case "Collection": self.collection = plist
case "Favorite Tracks":
self.favorites = plist
self.favorites.user_tracks = True
case "Most Played Tracks": self.most_played = plist
case "New Tracks": self.new_tracks = plist
case "Previous Tracks":
self.previous = plist
self.sql("DELETE FROM system_tracks WHERE propertyid=?",
self.previous.propertyid)
case "Queued Tracks":
self.queued = plist
self.queued.user_tracks = True
self.queued.tracks_movable = True
case "Unplayed Tracks": self.unplayed = plist
case _:
plist.user_tracks = True
plist.tracks_movable = True
return plist
def do_add_track(self, playlist: Playlist, track: tracks.Track) -> bool:
"""Add a Track to the requested Playlist."""
match playlist:
case self.collection: return track.get_library().enabled
case self.most_played: view = "most_played_view"
case self.new_tracks: view = "new_tracks_view"
case self.favorites:
track.update_properties(favorite=True)
return True
case self.previous:
self.add_system_track(playlist, track)
return True
case self.queued:
self.sql.set_active_playlist(playlist)
return self.add_user_track(playlist, track)
case self.unplayed: return track.playcount == 0
case _: return self.add_user_track(playlist, track)
return self.sql(f"SELECT ? IN {view}", track.trackid).fetchone()[0]
def do_get_user_track_order(self, playlist: Playlist) -> dict[int, int]:
"""Get the user-configured sort order for a playlist."""
cur = self.sql("""SELECT trackid FROM user_tracks WHERE propertyid=?
ORDER BY position NULLS LAST, rowid""",
playlist.propertyid)
return {row["trackid"]: i for (i, row) in enumerate(cur.fetchall())}
def do_move_track_down(self, playlist: Playlist,
track: tracks.Track) -> bool:
"""Move a track down in the user sort order."""
return self.__move_user_trackid(playlist, track.trackid, offset=1)
def do_move_track_up(self, playlist: Playlist,
track: tracks.Track) -> bool:
"""Move a track up in the user sort order."""
return self.__move_user_trackid(playlist, track.trackid, offset=-1)
def do_remove_track(self, playlist: Playlist, track: tracks.Track) -> bool:
"""Remove a Track from the requested Playlist."""
match playlist:
case self.collection: return True
case self.most_played: return True
case self.new_tracks: return True
case self.unplayed: return True
case self.favorites:
track.update_properties(favorite=False)
return True
case self.previous:
return self.remove_system_track(playlist, track)
case _: return self.remove_user_track(playlist, track)
def do_sql_delete(self, playlist: Playlist) -> sqlite3.Cursor:
"""Delete a playlist."""
return self.sql("DELETE FROM playlists WHERE playlistid=?",
playlist.playlistid)
def do_sql_glob(self, glob: str) -> sqlite3.Cursor:
"""Search for playlists matching the search text."""
return self.sql("""SELECT playlistid FROM playlists
WHERE CASEFOLD(name) GLOB ?""", glob)
def do_sql_insert(self, name: str, **kwargs) -> sqlite3.Cursor | None:
"""Insert a new playlist into the database."""
if (cur := self.sql("INSERT INTO playlists (name) VALUES (?)", name)):
return self.sql("SELECT * FROM playlists_view WHERE playlistid=?",
cur.lastrowid)
def do_sql_select_all(self) -> sqlite3.Cursor:
"""Load playlists from the database."""
return self.sql("SELECT * FROM playlists_view")
def do_sql_select_trackids(self, playlist: Playlist) -> sqlite3.Cursor:
"""Load Tracks from the database."""
match playlist:
case self.collection: view = "collection_view"
case self.favorites: view = "favorite_view"
case self.most_played: view = "most_played_view"
case self.new_tracks: view = "new_tracks_view"
case self.unplayed: view = "unplayed_tracks_view"
case self.previous: return self.get_sql_system_trackids(playlist)
case _: return self.get_sql_user_trackids(playlist)
return self.sql(f"SELECT trackid FROM {view}")
def do_sql_select_one(self, name: str) -> sqlite3.Cursor:
"""Look up a playlist by name."""
return self.sql("SELECT playlistid FROM playlists WHERE name=?", name)
def do_sql_update(self, playlist: Playlist,
column: str, newval) -> sqlite3.Cursor:
"""Update a playlist."""
return self.sql(f"UPDATE playlists SET {column}=? WHERE playlistid=?",
newval, playlist.playlistid)
def add_user_track(self, playlist: Playlist, track: tracks.Track) -> bool:
"""Add a Track to the User Tracks table."""
cur = self.sql("""INSERT INTO user_tracks (propertyid, trackid)
VALUES (?, ?)""", playlist.propertyid, track.trackid)
return cur and cur.rowcount == 1
def get_sql_user_trackids(self, playlist: Playlist) -> sqlite3.Cursor:
"""Load user Tracks from the database."""
return self.sql("""SELECT trackid FROM user_tracks_view
WHERE propertyid=?""", playlist.propertyid)
def create(self, name: str) -> Playlist:
"""Create a new Playlist."""
if len(name := name.strip()) > 0:
return super().create(name)
def remove_user_track(self, playlist: Playlist,
track: tracks.Track) -> bool:
"""Remove a track from the User Tracks table."""
return self.sql("""DELETE FROM user_tracks
WHERE propertyid=? AND trackid=?""",
playlist.propertyid, track.trackid).rowcount == 1
def rename(self, playlist: Playlist, new_name: str) -> bool:
"""Rename a Playlist."""
if len(new_name := new_name.strip()) > 0:
if playlist.name != new_name:
if self.update(playlist, "name", new_name):
self.store.remove(playlist)
playlist.name = new_name
self.store.append(playlist)
return True
return False