emmental/emmental/playlist/playlist.py

232 lines
8.8 KiB
Python

# Copyright 2023 (c) Anna Schumaker.
"""A TrackidModel with extra features."""
from gi.repository import GObject
from . import model
from .. import db
FLAGS = GObject.ParamFlags.READWRITE | GObject.ParamFlags.EXPLICIT_NOTIFY
class Playlist(model.TrackidModel):
"""A TrackidModel with extra Playlist features."""
can_go_next = GObject.Property(type=bool, default=False)
def __init__(self, sql: db.Connection,
playlist: db.playlist.Playlist = None):
"""Initialize the Playlist instance."""
super().__init__(sql=sql)
self.__picked = db.tracks.TrackidSet()
self.__sort_keys = {}
self.__playlist = None
self.__sort_order = None
if playlist is not None:
self.playlist = playlist
self.connect("items-changed", self.__update_can_go_next)
self.connect("notify::current-track", self.__update_can_go_next)
def __get_nth_track(self, n: int | None) -> db.tracks.Track | None:
return self[n] if n is not None and n < len(self.trackids) else None
def __get_random_index(self, loop: bool) -> int | None:
choices = self.__playlist.tracks - self.__picked
if len(choices) == 0 and loop:
self.__picked.trackids = {}
choices = self.__playlist.tracks
return super().index(choices.random_trackid())
def __playlist_notify(self, plist: db.playlist.Playlist, param) -> None:
match param.name:
case "loop" | "shuffle":
self.notify(param.name)
case "sort-order":
self.__sort_order = plist.sort_order
self.on_trackids_reset(plist.tracks)
self.notify("sort-order")
def __update_can_go_next(self, *args) -> None:
if len(self.trackids) == 0:
self.can_go_next = False
else:
current = self.__playlist.current_trackid
self.can_go_next = current != self.trackids[-1]
def __track_moved(self, track: db.tracks.Track, *, offset: int) -> None:
index = self.index(track)
new_pos = index + offset
n_changed = abs(offset) + 1
del self.trackids[index]
self.trackids.insert(new_pos, track.trackid)
self.__sort_keys = self.__playlist.get_track_order()
self.items_changed(position=min(index, new_pos),
removed=n_changed, added=n_changed)
def do_get_sort_key(self, trackid: int) -> int:
"""Get a sort key for the given trackid."""
if (key := self.__sort_keys.get(trackid)) is None:
if self.__playlist is not None:
self.__sort_keys = self.__playlist.get_track_order()
key = self.__sort_keys.get(trackid, 0)
else:
return trackid
return key
def on_trackid_removed(self, set: db.tracks.TrackidSet,
trackid: int) -> None:
"""Handle the TrackidSet::trackid-removed signal."""
if self.__playlist.current_trackid == trackid:
index = super().index(trackid) - 1
self.current_track = None if index < 0 else self[index]
super().on_trackid_removed(set, trackid)
self.__sort_keys.pop(trackid, None)
def on_trackids_reset(self, set: db.tracks.TrackidSet) -> None:
"""Handle the TrackidSet::trackids-reset signal."""
self.__sort_keys.clear()
super().on_trackids_reset(set)
if super().index(self.__playlist.current_trackid) is None:
self.current_track = None
def add_track(self, track: db.tracks.Track) -> None:
"""Add a track to the playlist."""
if self.__playlist is not None:
self.__playlist.add_track(track)
def index(self, track: db.tracks.Track) -> int | None:
"""Find the index of a track in the list."""
if track is not None:
return super().index(track.trackid)
def move_track_down(self, track: db.tracks.Track) -> None:
"""Move a track earlier in the sort order."""
if self.__playlist is not None:
need_handling = self.__sort_order == "user"
if self.__playlist.move_track_down(track) and need_handling:
self.__track_moved(track, offset=1)
def move_track_up(self, track: db.tracks.Track) -> None:
"""Move a track earlier in the sort order."""
if self.__playlist is not None:
need_handling = self.__sort_order == "user"
if self.__playlist.move_track_up(track) and need_handling:
self.__track_moved(track, offset=-1)
def next_track(self) -> db.tracks.Track | None:
"""Select the next track for playback."""
if self.__playlist is None:
return None
index = self.index(self.current_track)
match (index, self.__playlist.loop, self.__playlist.shuffle):
case (None, _, False): index = 0
case (None, _, True): index = self.__get_random_index(False)
case (_, "Playlist", False): index = (index + 1) % self.n_tracks
case (_, "Playlist", True): index = self.__get_random_index(True)
case (_, "None", False): index += 1
case (_, "None", True): index = self.__get_random_index(False)
if (next := self.__get_nth_track(index)) is not None:
self.current_track = next
return next
def remove_track(self, track: db.tracks.Track) -> None:
"""Remove a track from the playlist."""
if self.__playlist is not None:
self.__playlist.remove_track(track)
def request_track(self, track: db.tracks.Track) -> None:
"""Request a track for playback directly."""
if self.index(track) is not None:
if not self.can_go_next:
self.__picked.trackids = set()
self.emit("track-requested", track)
@GObject.Property(type=db.tracks.Track)
def current_track(self) -> db.tracks.Track | None:
"""Get the current Track of the Playlist."""
if self.__playlist is not None:
return self.sql.tracks.rows.get(self.__playlist.current_trackid)
@current_track.setter
def current_track(self, track: db.tracks.Track | None) -> None:
"""Set the current Track."""
if self.__playlist is not None:
if track is not None:
self.__playlist.current_trackid = track.trackid
self.__picked.add_track(track)
else:
self.__playlist.current_trackid = 0
@GObject.Property(type=str, flags=FLAGS)
def loop(self) -> str:
"""Get the current loop setting of the Playlist."""
return "None" if self.__playlist is None else self.__playlist.loop
@loop.setter
def loop(self, newval: str) -> None:
if self.__playlist is not None:
if newval not in {"None", "Track", "Playlist"}:
raise ValueError
self.__playlist.loop = newval
@GObject.Property(type=db.playlist.Playlist)
def playlist(self) -> db.playlist.Playlist | None:
"""Get the current db playlist."""
return self.__playlist
@playlist.setter
def playlist(self, new: db.playlist.Playlist | None) -> None:
"""Set a new db playlist to the playlist."""
if self.__playlist:
self.__playlist.disconnect_by_func(self.__playlist_notify)
self.__picked.trackids = set()
self.__playlist = new
if new is not None:
self.__playlist.connect("notify", self.__playlist_notify)
self.__sort_order = new.sort_order
self.trackid_set = new.tracks
if len(self.trackids) > 0:
if new.current_trackid == self.trackids[-1]:
new.current_trackid = 0
if track := self.current_track:
self.__picked.add_track(track)
else:
self.__sort_order = None
self.trackid_set = None
for prop in ("current-track", "loop", "shuffle", "sort-order"):
self.notify(prop)
@GObject.Property(type=bool, default=False, flags=FLAGS)
def shuffle(self) -> bool:
"""Get the current shuffle setting of the Playlist."""
return False if self.__playlist is None else self.__playlist.shuffle
@shuffle.setter
def shuffle(self, newval: bool) -> None:
if self.__playlist is not None:
self.__playlist.shuffle = newval
@GObject.Property(type=str, flags=FLAGS)
def sort_order(self) -> str:
"""Get the current sort order."""
return self.__sort_order
@sort_order.setter
def sort_order(self, new_order: str) -> None:
if self.__playlist is not None:
self.__playlist.sort_order = new_order
@GObject.Signal(arg_types=(db.tracks.Track,))
def track_requested(self, track: db.tracks.Track) -> None:
"""Signal that a track has been requested for playback."""
self.current_track = track