104 lines
4.0 KiB
Python
104 lines
4.0 KiB
Python
# Copyright 2023 (c) Anna Schumaker.
|
|
"""Converts a TrackidSet into a Gio.ListModel."""
|
|
import bisect
|
|
from gi.repository import GObject
|
|
from gi.repository import Gio
|
|
from .. import db
|
|
|
|
|
|
class TrackidModel(GObject.GObject, Gio.ListModel):
|
|
"""A Gio.ListModel representing a TrackidSet."""
|
|
|
|
sql = GObject.Property(type=db.Connection)
|
|
n_tracks = GObject.Property(type=int)
|
|
|
|
def __init__(self, sql: db.Connection):
|
|
"""Initialize the TrackidModel."""
|
|
super().__init__(sql=sql)
|
|
self.__trackid_set = None
|
|
self.trackids = []
|
|
|
|
def bisect(self, trackid: int) -> tuple[bool, int | None]:
|
|
"""Bisect the TrackidModel for the given trackid."""
|
|
pos = bisect.bisect_left(self.trackids,
|
|
self.do_get_sort_key(trackid),
|
|
key=self.do_get_sort_key)
|
|
|
|
if pos < len(self.trackids):
|
|
return (self.trackids[pos] == trackid, pos)
|
|
return (False, pos)
|
|
|
|
def do_get_item_type(self) -> GObject.GType:
|
|
"""Get the item type of this Model."""
|
|
return db.tracks.Track.__gtype__
|
|
|
|
def do_get_n_items(self) -> int:
|
|
"""Get the number of items in the list."""
|
|
return len(self.trackids)
|
|
|
|
def do_get_item(self, n: int) -> db.tracks.Track | None:
|
|
"""Get the n-th item in the list."""
|
|
if n < len(self.trackids):
|
|
return self.sql.tracks.rows.get(self.trackids[n])
|
|
|
|
def do_get_sort_key(self, trackid: int) -> int:
|
|
"""Get a stort key for the given trackid."""
|
|
return trackid
|
|
|
|
def do_items_changed(self, *, position: int,
|
|
removed: int, added: int) -> None:
|
|
"""Emit the ::items-changed signal."""
|
|
self.n_tracks = len(self.trackids)
|
|
self.items_changed(position, removed, added)
|
|
|
|
def index(self, trackid: int) -> int | None:
|
|
"""Find the index of a specific trackid."""
|
|
(has, pos) = self.bisect(trackid)
|
|
return pos if has else None
|
|
|
|
def on_trackid_added(self, set: db.tracks.TrackidSet,
|
|
trackid: int) -> None:
|
|
"""Respond to the trackid-added signal."""
|
|
(has, pos) = self.bisect(trackid)
|
|
if not has:
|
|
self.trackids.insert(pos, trackid)
|
|
self.do_items_changed(position=pos, removed=0, added=1)
|
|
|
|
def on_trackid_removed(self, set: db.tracks.TrackidSet,
|
|
trackid: int) -> None:
|
|
"""Respond to the trackid-removed signal."""
|
|
(has, pos) = self.bisect(trackid)
|
|
if has:
|
|
del self.trackids[pos]
|
|
self.do_items_changed(position=pos, removed=1, added=0)
|
|
|
|
def on_trackids_reset(self, set: db.tracks.TrackidSet) -> None:
|
|
"""Respond to the trackids-reset signal."""
|
|
self.trackids = sorted(set.trackids, key=self.do_get_sort_key)
|
|
self.do_items_changed(position=0, removed=self.n_tracks,
|
|
added=len(self.trackids))
|
|
|
|
@GObject.Property(type=db.tracks.TrackidSet)
|
|
def trackid_set(self) -> db.tracks.TrackidSet | None:
|
|
"""Get the current trackid-set."""
|
|
return self.__trackid_set
|
|
|
|
@trackid_set.setter
|
|
def trackid_set(self, new: db.tracks.TrackidSet | None) -> None:
|
|
"""Set a new value to the trackid-set property."""
|
|
if self.__trackid_set is not None:
|
|
self.__trackid_set.disconnect_by_func(self.on_trackid_added)
|
|
self.__trackid_set.disconnect_by_func(self.on_trackid_removed)
|
|
self.__trackid_set.disconnect_by_func(self.on_trackids_reset)
|
|
self.trackids = []
|
|
|
|
self.__trackid_set = new
|
|
if new is not None:
|
|
new.connect("trackid-added", self.on_trackid_added)
|
|
new.connect("trackid-removed", self.on_trackid_removed)
|
|
new.connect("trackids-reset", self.on_trackids_reset)
|
|
self.trackids = sorted(new.trackids, key=self.do_get_sort_key)
|
|
|
|
self.do_items_changed(position=0, removed=self.n_tracks,
|
|
added=len(self.trackids))
|