Anna Schumaker
ab6eb556ad
In this case, the call to get_track_index() returns None which can't be used for the comparisons we're doing. Make sure we handle the None result explicitely. Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
274 lines
9.9 KiB
Python
274 lines
9.9 KiB
Python
# Copyright 2021 (c) Anna Schumaker.
|
|
import random
|
|
from gi.repository import GObject
|
|
from . import sql
|
|
from . import state
|
|
from . import table
|
|
|
|
class Playlist(GObject.GObject):
|
|
def __init__(self, row, icon_name):
|
|
GObject.GObject.__init__(self)
|
|
self._rowkey = row.keys()[0]
|
|
self._rowid = row[self._rowkey]
|
|
self._plstate = state.Table.get(row["plstateid"])
|
|
self._icon_name = icon_name
|
|
|
|
def delete(self): raise NotImplementedError
|
|
|
|
def has_children(self): return False
|
|
def can_add_remove_tracks(self): return False
|
|
|
|
def get_n_tracks(self):
|
|
cur = sql.execute(f"SELECT COUNT(*) FROM tracks "
|
|
f"WHERE {self._rowkey}=?", [ self._rowid ])
|
|
return cur.fetchone()[0]
|
|
|
|
def get_track(self, n):
|
|
order = ', '.join(self.plist_state.sort)
|
|
row = sql.execute(f"SELECT * FROM tracks "
|
|
f"INNER JOIN artists USING(artistid) "
|
|
f"INNER JOIN albums USING(albumid) "
|
|
f"INNER JOIN discs USING(discid) "
|
|
f"WHERE tracks.{self._rowkey}=? "
|
|
f"ORDER BY {order} LIMIT 1 OFFSET ?",
|
|
[ self.rowid, n ]).fetchone()
|
|
return track.Table.factory(row)
|
|
|
|
def get_tracks(self):
|
|
order = ', '.join(self.plist_state.sort)
|
|
rows = sql.execute(f"SELECT * FROM tracks "
|
|
f"INNER JOIN artists USING(artistid) "
|
|
f"INNER JOIN albums USING(albumid) "
|
|
f"INNER JOIN discs USING(discid) "
|
|
f"WHERE tracks.{self._rowkey}=? "
|
|
f"ORDER BY {order}", [ self.rowid ]).fetchall()
|
|
return [ track.Table.factory(row) for row in rows ]
|
|
|
|
def get_current_track(self):
|
|
return self.get_track(self.current) if self.current > -1 else None
|
|
|
|
def get_track_index(self, track):
|
|
order = ', '.join(self.plist_state.sort)
|
|
cur = sql.execute(f"SELECT * FROM (SELECT trackid,ROW_NUMBER() "
|
|
f"OVER (ORDER BY {order}) "
|
|
f"FROM tracks "
|
|
f"INNER JOIN artists USING(artistid) "
|
|
f"INNER JOIN albums USING(albumid) "
|
|
f"INNER JOIN discs USING(discid) "
|
|
f"WHERE tracks.{self._rowkey}=?) "
|
|
f"WHERE trackid=?", [ self.rowid, track.rowid ])
|
|
return cur.fetchone()[1] - 1
|
|
|
|
def track_adjusts_current(self, track):
|
|
if self.current > -1:
|
|
if (index := self.get_track_index(track)) != None:
|
|
return index <= self.current
|
|
return False
|
|
|
|
def add_track(self, track):
|
|
self.emit("track-added", track)
|
|
|
|
def remove_track(self, track, adjust_current):
|
|
self.emit("track-removed", track, adjust_current)
|
|
|
|
def next_track(self):
|
|
n = self.get_n_tracks()
|
|
if self.random and n > 1:
|
|
self.current += random.randint(1, int((3*n)/4))
|
|
else:
|
|
self.current += 1
|
|
return self.get_current_track()
|
|
|
|
def refresh(self):
|
|
self.emit("refreshed")
|
|
|
|
@GObject.Property
|
|
def name(self): raise NotImplementedError
|
|
|
|
@GObject.Property
|
|
def icon_name(self): return self._icon_name
|
|
|
|
@GObject.Property
|
|
def plist_state(self): return self._plstate
|
|
|
|
@GObject.Property
|
|
def current(self): return self._plstate.get_property("current")
|
|
|
|
@current.setter
|
|
def current(self, newval):
|
|
n_tracks = self.get_n_tracks()
|
|
if newval >= n_tracks:
|
|
if n_tracks == 0: newval = -1
|
|
elif self.random: newval = newval % n_tracks
|
|
elif self.loop: newval = 0
|
|
else: newval = n_tracks
|
|
self._plstate.set_property("current", newval)
|
|
|
|
@GObject.Property(type=bool,default=False)
|
|
def loop(self): return self._plstate.get_property("loop")
|
|
|
|
@loop.setter
|
|
def loop(self, newval): self._plstate.set_property("loop", newval)
|
|
|
|
@GObject.Property(type=bool,default=False)
|
|
def random(self): return self._plstate.get_property("random")
|
|
|
|
@random.setter
|
|
def random(self, newval): self._plstate.set_property("random", newval)
|
|
|
|
@GObject.Property(type=GObject.TYPE_PYOBJECT)
|
|
def sort(self): return self._plstate.get_property("sort")
|
|
|
|
@sort.setter
|
|
def sort(self, newval):
|
|
track = self.get_current_track()
|
|
self._plstate.set_property("sort", newval)
|
|
if track:
|
|
self.current = self.get_track_index(track)
|
|
self.refresh()
|
|
|
|
@GObject.Property
|
|
def rowid(self): return self._rowid
|
|
|
|
@GObject.Property
|
|
def rowkey(self): return self._rowkey
|
|
|
|
@GObject.Property
|
|
def plstateid(self): return self._plstate.rowid
|
|
|
|
@GObject.Signal
|
|
def refreshed(self): pass
|
|
|
|
@GObject.Signal(arg_types=(GObject.TYPE_PYOBJECT,))
|
|
def track_added(self, track):
|
|
if self.track_adjusts_current(track):
|
|
self.current += 1
|
|
|
|
@GObject.Signal(arg_types=(GObject.TYPE_PYOBJECT,bool))
|
|
def track_removed(self, track, adjust_current):
|
|
if adjust_current:
|
|
self.current -= 1
|
|
|
|
|
|
class MappedPlaylist(Playlist):
|
|
def __init__(self, row, icon_name, map_table):
|
|
Playlist.__init__(self, row, icon_name)
|
|
self._map_table = map_table
|
|
|
|
@GObject.Property
|
|
def map_table(self): return self._map_table
|
|
|
|
def can_add_remove_tracks(self): return self._map_table == "playlist_map"
|
|
|
|
def add_track(self, track):
|
|
res = sql.execute(f"INSERT OR IGNORE INTO {self.map_table} "
|
|
f"({self.rowkey}, trackid) VALUES (?, ?)",
|
|
[ self.rowid, track.rowid ]).rowcount == 1
|
|
if res:
|
|
super().add_track(track)
|
|
return res
|
|
|
|
def clear(self):
|
|
sql.execute(f"DELETE FROM {self.map_table} "
|
|
f"WHERE {self.rowkey}=?", [ self.rowid ])
|
|
|
|
def get_n_tracks(self):
|
|
cur = sql.execute(f"SELECT COUNT(*) FROM {self.map_table} "
|
|
f"WHERE {self._rowkey}=?", [ self._rowid ])
|
|
return cur.fetchone()[0]
|
|
|
|
def get_track(self, n):
|
|
order = ', '.join(self.plist_state.sort)
|
|
row = sql.execute(f"SELECT * FROM tracks "
|
|
f"INNER JOIN {self.map_table} USING(trackid) "
|
|
f"INNER JOIN artists USING(artistid) "
|
|
f"INNER JOIN albums USING(albumid) "
|
|
f"INNER JOIN discs USING(discid) "
|
|
f"WHERE {self._rowkey}=? "
|
|
f"ORDER BY {order} LIMIT 1 OFFSET ?",
|
|
[ self._rowid, n ]).fetchone()
|
|
return track.Table.factory(row)
|
|
|
|
def get_tracks(self):
|
|
order = ', '.join(self.plist_state.sort)
|
|
rows = sql.execute(f"SELECT * FROM tracks "
|
|
f"INNER JOIN {self.map_table} USING(trackid) "
|
|
f"INNER JOIN artists USING(artistid) "
|
|
f"INNER JOIN albums USING(albumid) "
|
|
f"INNER JOIN discs USING(discid) "
|
|
f"WHERE {self._rowkey}=? ORDER BY {order}",
|
|
[ self._rowid ]).fetchall()
|
|
return [ track.Table.factory(row) for row in rows ]
|
|
|
|
|
|
def get_track_index(self, track):
|
|
order = ', '.join(self.plist_state.sort)
|
|
row = sql.execute(f"SELECT * FROM (SELECT trackid,{self._rowkey},ROW_NUMBER() "
|
|
f"OVER (ORDER BY {order}) "
|
|
f"FROM tracks "
|
|
f"INNER JOIN {self.map_table} USING (trackid) "
|
|
f"INNER JOIN artists USING(artistid) "
|
|
f"INNER JOIN albums USING(albumid) "
|
|
f"INNER JOIN discs USING(discid)) "
|
|
f"WHERE {self._rowkey}=? AND trackid=?",
|
|
[ self.rowid, track.rowid ]).fetchone()
|
|
return row[2] - 1 if row else None
|
|
|
|
def remove_track(self, track):
|
|
adjust_current = self.track_adjusts_current(track)
|
|
res = sql.execute(f"DELETE FROM {self.map_table} "
|
|
f"WHERE {self.rowkey}=? AND trackid=?",
|
|
[ self.rowid, track.rowid ]).rowcount == 1
|
|
if res:
|
|
super().remove_track(track, adjust_current)
|
|
return res
|
|
|
|
|
|
class ParentPlaylist(Playlist):
|
|
def has_children(self): return True
|
|
def get_child_table(self): raise NotImplementedError
|
|
|
|
def get_n_children(self):
|
|
return self.get_child_table().get_n_children(self)
|
|
|
|
def get_child(self, n):
|
|
return self.get_child_table().get_child(self, n)
|
|
|
|
def get_child_index(self, child):
|
|
return self.get_child_table().get_child_index(self, child)
|
|
|
|
def find_child(self, *args):
|
|
if (res := self.lookup_child(*args)) == None:
|
|
res = self.get_child_table().insert(self, *args)
|
|
self.emit("children-changed", self.get_child_index(res), 0, 1)
|
|
return res
|
|
|
|
def lookup_child(self, *args):
|
|
return self.get_child_table().lookup(self, *args)
|
|
|
|
@GObject.Signal(arg_types=(int,int,int))
|
|
def children_changed(self, pos, rm, add): pass
|
|
|
|
|
|
class Model(table.Model):
|
|
def insert(self, *args, **kwargs):
|
|
loop = kwargs.pop("loop", False)
|
|
sort = kwargs.pop("sort", state.DefaultSort)
|
|
return super().insert(state.Table.insert(loop=loop,sort=sort), *args)
|
|
|
|
def delete(self, plist):
|
|
state.Table.delete(plist.plist_state)
|
|
return super().delete(plist)
|
|
|
|
|
|
class ChildModel(table.Child):
|
|
def insert(self, *args, **kwargs):
|
|
return super().insert(state.Table.insert(), *args)
|
|
|
|
def delete(self, plist):
|
|
state.Table.delete(plist.plist_state)
|
|
return super().delete(plist)
|
|
|
|
|
|
from . import track
|