db: Add an Album Table

This table allows us to work with Album playlists that have a name,
album artist, release date, (optional) mbid, and (optional) cover.
Note that we can insert multiple albums with the same name as long as
their mbid, artist, or release date is different.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2022-08-23 11:12:39 -04:00
parent d3bdaaa063
commit 1b38c4d6ec
5 changed files with 335 additions and 2 deletions

View File

@ -3,6 +3,7 @@
import pathlib
from gi.repository import GObject
from typing import Generator
from . import albums
from . import artists
from . import connection
from . import playlist
@ -31,6 +32,7 @@ class Connection(connection.Connection):
self.settings = settings.Table(self)
self.playlists = playlists.Table(self)
self.artists = artists.Table(self)
self.albums = albums.Table(self, queue=self.artists.queue)
def close(self) -> None:
"""Close the database connection."""
@ -53,7 +55,7 @@ class Connection(connection.Connection):
def playlist_tables(self) -> Generator[playlist.Table, None, None]:
"""Iterate over each playlist table."""
for tbl in [self.playlists, self.artists]:
for tbl in [self.playlists, self.artists, self.albums]:
yield tbl
def set_active_playlist(self, plist: playlist.Playlist) -> None:

82
emmental/db/albums.py Normal file
View File

@ -0,0 +1,82 @@
# Copyright 2022 (c) Anna Schumaker
"""A custom Gio.ListModel for working with albums."""
import pathlib
import sqlite3
from gi.repository import GObject
from .. import format
from . import playlist
class Album(playlist.Playlist):
"""Our custom Album with a ListModel representing mediums."""
albumid = GObject.Property(type=int)
artist = GObject.Property(type=str)
release = GObject.Property(type=str)
mbid = GObject.Property(type=str)
cover = GObject.Property(type=GObject.TYPE_PYOBJECT)
@property
def primary_key(self) -> int:
"""Get the Album primary key."""
return self.albumid
class Table(playlist.Table):
"""Our Album Table."""
def do_construct(self, **kwargs) -> Album:
"""Construct a new album."""
return Album(**kwargs)
def do_get_sort_key(self, album: Album) -> tuple[tuple, bool,
str, tuple, str]:
"""Get a sort key for the requested Artist."""
return (format.sort_key(album.name),
len(album.mbid) == 0, album.mbid.casefold(),
format.sort_key(album.artist),
album.release)
def do_sql_delete(self, album: Album) -> sqlite3.Cursor:
"""Delete an album."""
return self.sql("DELETE FROM albums WHERE albumid=?", album.albumid)
def do_sql_glob(self, glob: str) -> sqlite3.Cursor:
"""Search for albums matching the search text."""
return self.sql("""SELECT albumid FROM albums
WHERE CASEFOLD(name) GLOB ?""", glob)
def do_sql_insert(self, name: str, artist: str,
release: str, *, mbid: str = "",
cover: pathlib.Path = None) -> sqlite3.Cursor | None:
"""Create a new album."""
if cur := self.sql("""INSERT INTO albums
(name, artist, release, mbid, cover)
VALUES (?, ?, ?, ?, ?)""",
name, artist, release, mbid, cover):
return self.sql("SELECT * FROM albums_view WHERE albumid=?",
cur.lastrowid)
def do_sql_select_all(self) -> sqlite3.Cursor:
"""Load albums from the database."""
return self.sql("SELECT * FROM albums_view")
def do_sql_select_one(self, name: str = None,
artist: str = None, release: str = None,
*, mbid: str = "") -> sqlite3.Cursor:
"""Look up an albums by name, mbid, artist, and release."""
where = ["mbid=?"]
args = [mbid.lower()]
if None not in (name, artist, release):
where.extend(["CASEFOLD(name)=?",
"CASEFOLD(artist)=?", "release=?"])
args.extend([name.casefold(), artist.casefold(), release])
return self.sql(f"""SELECT albumid FROM albums
WHERE {" AND ".join(where)}""", *args)
def do_sql_update(self, album: Album, column: str, newval) -> bool:
"""Rename an album."""
return self.sql(f"UPDATE albums SET {column}=? WHERE albumid=?",
newval, album.albumid)

View File

@ -107,6 +107,45 @@ CREATE TRIGGER artists_delete_trigger AFTER DELETE ON artists
END;
/************************
* *
* Albums *
* *
************************/
CREATE TABLE albums (
albumid INTEGER PRIMARY KEY,
propertyid INTEGER REFERENCES playlist_properties (propertyid)
ON DELETE CASCADE
ON UPDATE CASCADE,
name TEXT NOT NULL COLLATE NOCASE,
artist TEXT NOT NULL COLLATE NOCASE,
release TEXT NOT NULL,
mbid TEXT NOT NULL DEFAULT "" COLLATE NOCASE,
cover PATH,
UNIQUE (name, mbid, artist, release)
);
CREATE VIEW albums_view AS
SELECT albumid, propertyid, name, mbid, artist, release, cover, active
FROM albums
JOIN playlist_properties USING (propertyid);
CREATE TRIGGER albums_insert_trigger AFTER INSERT ON albums
BEGIN
INSERT INTO playlist_properties (active) VALUES (False);
UPDATE albums SET propertyid = last_insert_rowid(),
mbid = LOWER(NEW.mbid)
WHERE albumid = NEW.albumid;
END;
CREATE TRIGGER albums_delete_trigger AFTER DELETE ON albums
BEGIN
DELETE FROM playlist_properties WHERE propertyid = OLD.propertyid;
END;
/******************************************
* *
* Create Default Playlists *

206
tests/db/test_albums.py Normal file
View File

@ -0,0 +1,206 @@
# Copyright 2022 (c) Anna Schumaker.
"""Tests our album Gio.ListModel."""
import pathlib
import emmental.db
import tests.util
class TestAlbumObject(tests.util.TestCase):
"""Tests our album object."""
def setUp(self):
"""Set up common variables."""
tests.util.TestCase.setUp(self)
self.table = self.sql.albums
self.album = emmental.db.albums.Album(table=self.table,
albumid=123, propertyid=456,
name="Test Album")
def test_init(self):
"""Test that the Artist is set up properly."""
self.assertIsInstance(self.album, emmental.db.playlist.Playlist)
self.assertEqual(self.album.table, self.table)
self.assertEqual(self.album.propertyid, 456)
self.assertEqual(self.album.albumid, 123)
self.assertEqual(self.album.primary_key, 123)
self.assertEqual(self.album.artist, "")
self.assertEqual(self.album.release, "")
self.assertEqual(self.album.mbid, "")
self.assertIsNone(self.album.cover)
self.assertIsNone(self.album.parent)
cover = pathlib.Path("/a/b/c.jpg")
album2 = emmental.db.albums.Album(table=self.table, propertyid=123,
albumid=456, mbid="ab-cd-ef",
artist="Test Artist", name="Album",
release="1988-06", cover=cover)
self.assertEqual(album2.artist, "Test Artist")
self.assertEqual(album2.release, "1988-06")
self.assertEqual(album2.mbid, "ab-cd-ef")
self.assertEqual(album2.cover, cover)
class TestAlbumTable(tests.util.TestCase):
"""Tests our album table."""
def setUp(self):
"""Set up common variables."""
tests.util.TestCase.setUp(self)
self.table = self.sql.albums
def test_init(self):
"""Test that the album model is configured correctly."""
self.assertIsInstance(self.table, emmental.db.playlist.Table)
self.assertEqual(len(self.table), 0)
def test_construct(self):
"""Test constructing an album playlist."""
album = self.table.construct(propertyid=1, albumid=1,
name="Test Album", mbid="ab-cd-ef",
artist="Test Artist", release="1988-06",
cover=tests.util.COVER_JPG)
self.assertIsInstance(album, emmental.db.albums.Album)
self.assertEqual(album.table, self.table)
self.assertEqual(album.propertyid, 1)
self.assertEqual(album.albumid, 1)
self.assertEqual(album.name, "Test Album")
self.assertEqual(album.artist, "Test Artist")
self.assertEqual(album.release, "1988-06")
self.assertEqual(album.mbid, "ab-cd-ef")
self.assertEqual(album.cover, tests.util.COVER_JPG)
self.assertFalse(album.active)
def test_create(self):
"""Test creating an album playlist."""
album1 = self.table.create("Test Album", "Album Artist", "2023-03")
self.assertIsInstance(album1, emmental.db.albums.Album)
self.assertEqual(album1.name, "Test Album")
self.assertEqual(album1.artist, "Album Artist")
self.assertEqual(album1.release, "2023-03")
self.assertEqual(album1.mbid, "")
self.assertIsNone(album1.cover)
self.assertEqual(self.table[0], album1)
cur = self.sql("SELECT COUNT(name) FROM albums")
self.assertEqual(cur.fetchone()["COUNT(name)"], 1)
row = self.sql("""SELECT COUNT(*) FROM playlist_properties
WHERE propertyid=?""", album1.propertyid).fetchone()
self.assertEqual(row["COUNT(*)"], 1)
album2 = self.table.create("Test Album", "Album Artist", "1988-06",
cover=tests.util.COVER_JPG,
mbid="AB-CD-EF")
self.assertEqual(album2.artist, "Album Artist")
self.assertEqual(album2.release, "1988-06")
self.assertEqual(album2.mbid, "ab-cd-ef")
self.assertEqual(album2.cover, tests.util.COVER_JPG)
self.assertEqual(self.table[0], album2)
self.assertEqual(self.table[1], album1)
cur = self.sql("SELECT COUNT(name) FROM albums")
self.assertEqual(cur.fetchone()["COUNT(name)"], 2)
self.assertIsNone(self.table.create("Test Album",
"Album Artist", "2023-03"))
def test_delete(self):
"""Test deleting an album playlist."""
album = self.table.create("Test Album", "Album Artist", "2023-03")
self.assertTrue(album.delete())
self.assertIsNone(self.table.index(album))
cur = self.sql("SELECT COUNT(name) FROM albums")
self.assertEqual(cur.fetchone()["COUNT(name)"], 0)
self.assertEqual(len(self.table), 0)
self.assertIsNone(self.table.get_item(0))
row = self.sql("""SELECT COUNT(*) FROM playlist_properties
WHERE propertyid=?""", album.propertyid).fetchone()
self.assertEqual(row["COUNT(*)"], 0)
self.assertFalse(album.delete())
def test_filter(self):
"""Test filtering an album playlist."""
self.table.create("Album 1", "Album Artist", "2023-03")
self.table.create("Album 2", "Album Artist", "2023-03")
self.table.filter("*1", now=True)
self.assertSetEqual(self.table.get_filter().keys, {1})
self.table.filter("album*", now=True)
self.assertSetEqual(self.table.get_filter().keys, {1, 2})
def test_get_sort_key(self):
"""Test the get_sort_key() function."""
album1 = self.table.create("Album 1", "Album Artist", "2023-02")
album2 = self.table.create("Album 2", "Album Artist", "2023-02",
mbid="ab-cd-ef")
self.assertTupleEqual(self.table.get_sort_key(album1),
(("album", "1"), True, "",
("album", "artist"), "2023-02"))
self.assertTupleEqual(self.table.get_sort_key(album2),
(("album", "2"), False, "ab-cd-ef",
("album", "artist"), "2023-02"))
def test_load(self):
"""Test loading the album table."""
self.table.create("Album 1", "Album Artist", "2023-03")
self.table.create("Album 2", "Album Artist", "2023-03",
mbid="ab-cd-ef", cover=tests.util.COVER_JPG)
albums2 = emmental.db.albums.Table(self.sql)
self.assertEqual(len(albums2), 0)
albums2.load(now=True)
self.assertEqual(len(albums2), 2)
self.assertEqual(albums2.get_item(0).name, "Album 1")
self.assertEqual(albums2.get_item(0).artist, "Album Artist")
self.assertEqual(albums2.get_item(0).release, "2023-03")
self.assertEqual(albums2.get_item(0).mbid, "")
self.assertIsNone(albums2.get_item(0).cover)
self.assertEqual(albums2.get_item(1).name, "Album 2")
self.assertEqual(albums2.get_item(1).artist, "Album Artist")
self.assertEqual(albums2.get_item(1).release, "2023-03")
self.assertEqual(albums2.get_item(1).mbid, "ab-cd-ef")
self.assertEqual(albums2.get_item(1).cover,
tests.util.COVER_JPG)
def test_lookup(self):
"""Test looking up album playlists."""
album1 = self.table.create("Test Album", "Album Artist", "2023-03")
album2 = self.table.create("Test Album", "Album Artist", "2023-03",
mbid="ab-cd-ef")
self.assertEqual(self.table.lookup("Test Album", "Album Artist",
"2023-03"), album1)
self.assertEqual(self.table.lookup("test album", "album artist",
"2023-03"), album1)
self.assertEqual(self.table.lookup("Test Album", "Album Artist",
"2023-03", mbid="ab-cd-ef"), album2)
self.assertEqual(self.table.lookup("test album", "album artist",
"2023-03", mbid="ab-cd-ef"), album2)
self.assertIsNone(self.table.lookup("No Album", "No Artist", "0000"))
self.assertEqual(self.table.lookup(mbid="ab-cd-ef"), album2)
self.assertEqual(self.table.lookup(mbid="AB-CD-EF"), album2)
self.assertIsNone(self.table.lookup(mbid="gh-ij-kl"))
def test_update(self):
"""Test updating album attributes."""
album = self.table.create("Test Album", "Album Artist", "2023-03")
album.cover = tests.util.COVER_JPG
album.active = True
row = self.sql("SELECT cover, active FROM albums_view WHERE albumid=?",
album.albumid).fetchone()
self.assertEqual(row["cover"], tests.util.COVER_JPG)
self.assertEqual(row["active"], True)
album.cover = None
row = self.sql("SELECT cover FROM albums WHERE albumid=?",
album.albumid).fetchone()
self.assertIsNone(row["cover"], tests.util.COVER_JPG)

View File

@ -42,9 +42,13 @@ class TestConnection(tests.util.TestCase):
self.assertIsInstance(self.sql.settings, emmental.db.settings.Table)
self.assertIsInstance(self.sql.playlists, emmental.db.playlists.Table)
self.assertIsInstance(self.sql.artists, emmental.db.artists.Table)
self.assertIsInstance(self.sql.albums, emmental.db.albums.Table)
self.assertEqual(self.sql.albums.queue, self.sql.artists.queue)
self.assertListEqual([tbl for tbl in self.sql.playlist_tables()],
[self.sql.playlists, self.sql.artists])
[self.sql.playlists, self.sql.artists,
self.sql.albums])
def test_load(self):
"""Check that calling load() loads the tables."""