emmental/db/user.py

119 lines
4.2 KiB
Python
Raw Normal View History

# Copyright 2021 (c) Anna Schumaker.
#
# Table: playlists
# +------ ---+-----------+------+------+
# | playlistid | plstateid | name | sort |
# +-------- -+-----------+------+------+
#
# Index: playlist_index
# +--------------------+
# | name -> playlistid |
# +--------------------+
from gi.repository import GObject
from . import objects
from . import playlist
from . import sql
from . import track
class UserPlaylist(playlist.Playlist):
def __init__(self, row, icon_name):
playlist.Playlist.__init__(self, row, icon_name)
self._name = row["name"]
@GObject.Property
def name(self): return self._name
class UserTable(playlist.Model):
def __init__(self):
playlist.Model.__init__(self, "playlists", "sort")
def do_create(self):
sql.execute("CREATE TABLE IF NOT EXISTS playlists "
"(playlistid INTEGER PRIMARY KEY, "
" plstateid INTEGER NOT NULL, "
" name TEXT UNIQUE, "
" sort TEXT, "
" FOREIGN KEY (plstateid) REFERENCES playlist_states(plstateid))")
sql.execute("CREATE INDEX IF NOT EXISTS playlist_index ON playlists(name)")
self.find("Collection", loop=True)
self.find("Favorites")
self.find("New Tracks")
self.find("Previous")
self.find("Queued Tracks")
def do_factory(self, row):
if row["name"] == "Collection":
return UserPlaylist(row, "media-playback-start")
elif row["name"] == "Favorites":
return UserPlaylist(row, "emmental-favorites")
elif row["name"] == "New Tracks":
return UserPlaylist(row, "starred")
elif row["name"] == "Previous":
return UserPlaylist(row, "edit-undo")
elif row["name"] == "Queued Tracks":
return UserPlaylist(row, "edit-redo")
return UserPlaylist(row, "audio-x-generic")
def do_insert(self, plstate, name):
return sql.execute("INSERT INTO playlists (plstateid, name, sort) "
"VALUES (?, ?, ?)", [ plstate.rowid, name, name.casefold() ])
def do_lookup(self, name):
return sql.execute("SELECT * FROM playlists WHERE name=?", [ name ])
def find(self, name, loop=False):
if (res := self.lookup(name)) == None:
res = self.insert(name, loop=loop)
return res
class PlaylistMap(objects.Map):
def __init__(self, temp=False):
name = "playlist_map" if temp==False else "temp_playlist_map"
self.temporary = temp
objects.Map.__init__(self, name, Table.get, track.Table.get)
self.lookup_tracks = self.lookup_rhs
self.lookup_playlists = self.lookup_lhs
def do_create(self):
temp = "" if self.temporary == False else "TEMPORARY"
sql.execute(f"CREATE {temp} TABLE IF NOT EXISTS {self.map_name} "
"(playlistid INTEGER, "
" trackid INTEGER, "
" FOREIGN KEY(playlistid) REFERENCES playlists(playlistid), "
" FOREIGN KEY(trackid) REFERENCES tracks(trackid), "
" UNIQUE(playlistid, trackid))")
def do_insert(self, playlist, track):
sql.execute(f"INSERT INTO {self.map_name} (playlistid, trackid) "
"VALUES (?, ?)", [ playlist.rowid, track.rowid ])
def do_delete(self, playlist, track):
return sql.execute(f"DELETE FROM {self.map_name} "
"WHERE playlistid=? AND trackid=?",
[ playlist.rowid, track.rowid ])
def do_lookup_rhs(self, playlist):
return sql.execute(f"SELECT trackid FROM {self.map_name} "
"WHERE playlistid=?", [ playlist.rowid ])
def do_lookup_lhs(self, track):
return sql.execute(f"SELECT playlistid FROM {self.map_name} "
"WHERE trackid=?", [ track.rowid ])
def delete_track(self, track):
for playlist in self.lookup_playlists(track):
self.delete(playlist, track)
def delete_playlist(self, playlist):
for track in self.lookup_tracks(playlist):
self.delete(playlist, track)
Table = UserTable()
Map = PlaylistMap(temp=False)
TempMap = PlaylistMap(temp=True)