emmental/db/user.py

181 lines
6.8 KiB
Python
Raw Normal View History

# Copyright 2021 (c) Anna Schumaker.
#
# Table: playlists
# +------ ---+-----------+------+------+
# | playlistid | plstateid | name | sort |
# +-------- -+-----------+------+------+
#
# Index: playlist_index
# +--------------------+
# | name -> playlistid |
# +--------------------+
from gi.repository import GObject
from . import objects
from . import playlist
from . import sql
from . import state
from . import track
class Collection(playlist.Playlist):
def __init__(self, row):
playlist.Playlist.__init__(self, row, "media-playback-start")
self._name = row["name"]
def get_n_tracks(self):
cur = sql.execute("SELECT COUNT(*) FROM tracks "
"JOIN libraries USING(libraryid) WHERE enabled=1")
return cur.fetchone()[0]
def get_track(self, n):
order = ', '.join(self.plist_state.sort)
row = sql.execute(f"SELECT * FROM tracks "
f"INNER JOIN artists USING(artistid) "
f"INNER JOIN albums USING(albumid) "
f"INNER JOIN discs USING(discid) "
f"INNER JOIN years USING(yearid) "
f"INNER JOIN libraries USING(libraryid) "
f"WHERE libraries.enabled=1 "
f"ORDER BY {order} LIMIT 1 OFFSET ?",
[ n ]).fetchone()
return track.Table.factory(row)
def get_track_index(self, track):
order = ', '.join(self.plist_state.sort)
cur = sql.execute(f"SELECT * FROM (SELECT trackid,ROW_NUMBER() "
f"OVER (ORDER BY {order}) "
f"FROM tracks "
f"INNER JOIN artists USING(artistid) "
f"INNER JOIN albums USING(albumid) "
f"INNER JOIN discs USING(discid) "
f"INNER JOIN years USING(yearid) "
f"INNER JOIN libraries USING(libraryid) "
f"WHERE libraries.enabled=1) "
f"WHERE trackid=?", [ track.rowid ])
return cur.fetchone()[1] - 1
@GObject.Property
def name(self): return self._name
class UserPlaylist(playlist.MappedPlaylist):
def __init__(self, row, icon_name, map_table):
playlist.MappedPlaylist.__init__(self, row, icon_name, map_table)
self._name = row["name"]
def delete(self):
self.clear()
Table.delete(self)
@GObject.Property
def name(self): return self._name
class Previous(UserPlaylist):
def __init__(self, row):
UserPlaylist.__init__(self, row, "edit-undo", "temp_playlist_map")
def add_track(self, track):
if super().add_track(track):
return True
self.remove_track(track)
return super().add_track(track)
class UserTable(playlist.Model):
def __init__(self):
playlist.Model.__init__(self, "playlists", "sort")
def do_create(self):
sql.execute("CREATE TABLE IF NOT EXISTS playlists "
"(playlistid INTEGER PRIMARY KEY, "
" plstateid INTEGER NOT NULL, "
" name TEXT UNIQUE, "
" sort TEXT, "
" FOREIGN KEY (plstateid) REFERENCES playlist_states(plstateid))")
sql.execute("CREATE INDEX IF NOT EXISTS playlist_index ON playlists(name)")
self.find("Collection", loop=True)
self.find("Favorites")
self.find("New Tracks")
self.find("Previous", sort=["temp_playlist_map.rowid DESC"])
self.find("Queued Tracks", sort=["playlist_map.rowid ASC"])
def do_factory(self, row):
if row["name"] == "Collection":
return Collection(row)
elif row["name"] == "Favorites":
return UserPlaylist(row, "emmental-favorites", "playlist_map")
elif row["name"] == "New Tracks":
return UserPlaylist(row, "starred", "temp_playlist_map")
elif row["name"] == "Previous":
return Previous(row)
elif row["name"] == "Queued Tracks":
return UserPlaylist(row, "edit-redo", "playlist_map")
return UserPlaylist(row, "audio-x-generic", "playlist_map")
def do_insert(self, plstate, name):
return sql.execute("INSERT INTO playlists (plstateid, name, sort) "
"VALUES (?, ?, ?)", [ plstate.rowid, name, name.casefold() ])
def do_lookup(self, name):
return sql.execute("SELECT * FROM playlists WHERE name=?", [ name ])
def find(self, name, loop=False, sort=state.DefaultSort):
if (res := self.lookup(name)) == None:
res = self.insert(name, loop=loop, sort=sort)
return res
class PlaylistMap(objects.Map):
def __init__(self, temp=False):
name = "playlist_map" if temp==False else "temp_playlist_map"
self.temporary = temp
objects.Map.__init__(self, name, Table.get, self.get_track)
self.lookup_tracks = self.lookup_rhs
self.lookup_playlists = self.lookup_lhs
def get_track(self, rowid):
from . import track
return track.Table.get(rowid)
def do_create(self):
temp = "" if self.temporary == False else "TEMPORARY"
sql.execute(f"CREATE {temp} TABLE IF NOT EXISTS {self.map_name} "
"(playlistid INTEGER, "
" trackid INTEGER, "
" FOREIGN KEY(playlistid) REFERENCES playlists(playlistid), "
" FOREIGN KEY(trackid) REFERENCES tracks(trackid), "
" UNIQUE(playlistid, trackid))")
def do_insert(self, playlist, track):
sql.execute(f"INSERT INTO {self.map_name} (playlistid, trackid) "
"VALUES (?, ?)", [ playlist.rowid, track.rowid ])
def do_delete(self, playlist, track):
return sql.execute(f"DELETE FROM {self.map_name} "
"WHERE playlistid=? AND trackid=?",
[ playlist.rowid, track.rowid ])
def do_lookup_rhs(self, playlist):
return sql.execute(f"SELECT trackid FROM {self.map_name} "
"WHERE playlistid=?", [ playlist.rowid ])
def do_lookup_lhs(self, track):
return sql.execute(f"SELECT playlistid FROM {self.map_name} "
"WHERE trackid=?", [ track.rowid ])
def delete_track(self, track):
for playlist in self.lookup_playlists(track):
self.delete(playlist, track)
def delete_playlist(self, playlist):
for track in self.lookup_tracks(playlist):
self.delete(playlist, track)
Table = UserTable()
Map = PlaylistMap(temp=False)
TempMap = PlaylistMap(temp=True)