emmental/db/user.py

171 lines
6.4 KiB
Python

# Copyright 2021 (c) Anna Schumaker.
#
# Table: playlists
# +------ ---+-----------+------+------+
# | playlistid | plstateid | name | sort |
# +-------- -+-----------+------+------+
from gi.repository import GObject
from . import playlist
from . import sql
from . import state
from . import track
class Collection(playlist.Playlist):
def __init__(self, row):
playlist.Playlist.__init__(self, row, "media-playback-start")
self._name = row["name"]
def get_n_tracks(self):
cur = sql.execute("SELECT COUNT(*) FROM tracks "
"JOIN libraries USING(libraryid) WHERE enabled=1")
return cur.fetchone()[0]
def get_track(self, n):
order = ', '.join(self.plist_state.sort)
row = sql.execute(f"SELECT * FROM tracks "
f"INNER JOIN artists USING(artistid) "
f"INNER JOIN albums USING(albumid) "
f"INNER JOIN discs USING(discid) "
f"INNER JOIN libraries USING(libraryid) "
f"WHERE libraries.enabled=1 "
f"ORDER BY {order} LIMIT 1 OFFSET ?",
[ n ]).fetchone()
return track.Table.factory(row)
def get_tracks(self):
order = ', '.join(self.plist_state.sort)
rows = sql.execute(f"SELECT * FROM tracks "
f"INNER JOIN artists USING(artistid) "
f"INNER JOIN albums USING(albumid) "
f"INNER JOIN discs USING(discid) "
f"INNER JOIN libraries USING(libraryid) "
f"WHERE libraries.enabled=1 "
f"ORDER BY {order}").fetchall()
return [ track.Table.factory(row) for row in rows ]
def get_track_index(self, track):
order = ', '.join(self.plist_state.sort)
cur = sql.execute(f"SELECT * FROM (SELECT trackid,ROW_NUMBER() "
f"OVER (ORDER BY {order}) "
f"FROM tracks "
f"INNER JOIN artists USING(artistid) "
f"INNER JOIN albums USING(albumid) "
f"INNER JOIN discs USING(discid) "
f"INNER JOIN libraries USING(libraryid) "
f"WHERE libraries.enabled=1) "
f"WHERE trackid=?", [ track.rowid ])
return cur.fetchone()[1] - 1
@GObject.Property
def name(self): return self._name
class UserPlaylist(playlist.MappedPlaylist):
def __init__(self, row, icon_name, map_table):
playlist.MappedPlaylist.__init__(self, row, icon_name, map_table)
self._name = row["name"]
def delete(self):
self.clear()
Table.delete(self)
sql.commit()
@GObject.Property
def name(self): return self._name
class Previous(UserPlaylist):
def __init__(self, row):
UserPlaylist.__init__(self, row, "media-skip-backward", "temp_playlist_map")
def add_track(self, track):
if self.get_track_index(track):
self.remove_track(track)
super().add_track(track)
self.current = 0
return True
def next_track(self):
self.current = max(-1, self.current - 1)
return self.get_current_track()
def previous_track(self):
return super().next_track()
class QueuedTracks(UserPlaylist):
def __init__(self, row):
UserPlaylist.__init__(self, row, "media-skip-forward", "playlist_map")
def next_track(self):
if track := super().next_track():
self.remove_track(track)
return track
class UserTable(playlist.Model):
def __init__(self):
playlist.Model.__init__(self, "playlists", "sort")
def do_create(self):
sql.execute("CREATE TABLE IF NOT EXISTS playlists "
"(playlistid INTEGER PRIMARY KEY, "
" plstateid INTEGER NOT NULL, "
" name TEXT UNIQUE, "
" sort TEXT, "
" FOREIGN KEY (plstateid) REFERENCES playlist_states(plstateid))")
sql.execute(f"CREATE TABLE IF NOT EXISTS playlist_map "
"(playlistid INTEGER, "
" trackid INTEGER, "
" FOREIGN KEY(playlistid) REFERENCES playlists(playlistid), "
" FOREIGN KEY(trackid) REFERENCES tracks(trackid), "
" UNIQUE(playlistid, trackid))")
sql.execute(f"CREATE TEMPORARY TABLE IF NOT EXISTS temp_playlist_map "
"(playlistid INTEGER, "
" trackid INTEGER, "
" FOREIGN KEY(playlistid) REFERENCES playlists(playlistid), "
" FOREIGN KEY(trackid) REFERENCES tracks(trackid), "
" UNIQUE(playlistid, trackid))")
self.find("Collection", loop=True)
self.find("Favorites")
self.find("New Tracks")
self.find("Previous", sort=["temp_playlist_map.rowid DESC"])
self.find("Queued Tracks", sort=["playlist_map.rowid ASC"])
def do_drop(self):
sql.execute("DROP TABLE playlists")
sql.execute("DROP TABLE playlist_map")
sql.execute("DROP TABLE temp_playlist_map")
def do_factory(self, row):
match row["name"]:
case "Collection":
return Collection(row)
case "Favorites":
return UserPlaylist(row, "emmental-favorites", "playlist_map")
case "New Tracks":
return UserPlaylist(row, "starred", "temp_playlist_map")
case "Previous":
return Previous(row)
case "Queued Tracks":
return QueuedTracks(row)
case _:
return UserPlaylist(row, "audio-x-generic", "playlist_map")
def do_insert(self, plstate, name):
return sql.execute("INSERT INTO playlists (plstateid, name, sort) "
"VALUES (?, ?, ?)", [ plstate.rowid, name, name.casefold() ])
def do_lookup(self, name):
return sql.execute("SELECT * FROM playlists WHERE name=?", [ name ])
def find(self, name, loop=False, sort=state.DefaultSort):
if (res := self.lookup(name)) == None:
res = self.insert(name, loop=loop, sort=sort)
return res
Table = UserTable()