db: Add a Decade Table

This table allows us to work with Decade playlists that can be created
or looked up by an individual year in that decade. I also add a few
custom functions to SQLite to make working with decades easier.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2022-08-31 11:23:47 -04:00
parent 2dc5d9ed0a
commit 0cf5f80eb4
5 changed files with 220 additions and 2 deletions

View File

@ -6,6 +6,7 @@ from typing import Generator
from . import albums
from . import artists
from . import connection
from . import decades
from . import genres
from . import playlist
from . import media
@ -37,6 +38,7 @@ class Connection(connection.Connection):
self.albums = albums.Table(self, queue=self.artists.queue)
self.media = media.Table(self, queue=self.artists.queue)
self.genres = genres.Table(self)
self.decades = decades.Table(self)
def close(self) -> None:
"""Close the database connection."""
@ -60,7 +62,7 @@ class Connection(connection.Connection):
def playlist_tables(self) -> Generator[playlist.Table, None, None]:
"""Iterate over each playlist table."""
for tbl in [self.playlists, self.artists, self.albums, self.media,
self.genres]:
self.genres, self.decades]:
yield tbl
def set_active_playlist(self, plist: playlist.Playlist) -> None:

53
emmental/db/decades.py Normal file
View File

@ -0,0 +1,53 @@
# Copyright 2022 (c) Anna Schumaker
"""A custom Gio.ListModel for working with decades."""
import sqlite3
from gi.repository import GObject
from . import playlist
class Decade(playlist.Playlist):
"""Our custom Decade object."""
decade = GObject.Property(type=int)
@property
def primary_key(self) -> int:
"""Get the primary key of this Decade."""
return self.decade
class Table(playlist.Table):
"""Our Decade Table."""
def do_construct(self, **kwargs) -> Decade:
"""Construct a new Decade playlist."""
return Decade(**kwargs)
def do_get_sort_key(self, decade: Decade) -> int:
"""Get the sort key for the requested decade."""
return decade.decade
def do_sql_delete(self, decade: Decade) -> sqlite3.Cursor:
"""Delete a decade."""
return self.sql("DELETE FROM decades WHERE decade=?", decade.decade)
def do_sql_glob(self, glob: str) -> sqlite3.Cursor:
"""Search for decades matching the search text."""
return self.sql("""SELECT decade FROM decades_view
WHERE CASEFOLD(name) GLOB ?""", glob)
def do_sql_insert(self, year: int) -> sqlite3.Cursor | None:
"""Create a new Decade playlist."""
decade = year // 10 * 10
if self.sql("INSERT INTO decades (decade) VALUES (?)", decade):
return self.sql("SELECT * FROM decades_view WHERE decade=?",
decade)
def do_sql_select_all(self) -> sqlite3.Cursor:
"""Load Decades from the database."""
return self.sql("SELECT * FROM decades_view")
def do_sql_select_one(self, year: int) -> sqlite3.Cursor:
"""Look up an decade by year."""
return self.sql("SELECT decade FROM decades WHERE decade=?",
year // 10 * 10)

View File

@ -242,6 +242,38 @@ CREATE TRIGGER genres_delete_trigger AFTER DELETE ON genres
END;
/*************************
* *
* Decades *
* *
*************************/
CREATE TABLE decades (
decade INTEGER PRIMARY KEY,
propertyid INTEGER REFERENCES playlist_properties (propertyid)
ON DELETE CASCADE
ON UPDATE CASCADE
CHECK (decade % 10 = 0)
);
CREATE VIEW decades_view AS
SELECT decade, propertyid, FORMAT("The %ds", decade) as name, active
FROM decades
JOIN playlist_properties USING (propertyid);
CREATE TRIGGER decades_insert_trigger AFTER INSERT ON decades
BEGIN
INSERT INTO playlist_properties (active) VALUES (False);
UPDATE decades SET propertyid = last_insert_rowid()
WHERE decade = NEW.decade;
END;
CREATE TRIGGER decades_delete_trigger AFTER DELETE ON decades
BEGIN
DELETE FROM playlist_properties WHERE propertyid = OLD.propertyid;
END;
/******************************************
* *
* Create Default Playlists *

View File

@ -45,6 +45,7 @@ class TestConnection(tests.util.TestCase):
self.assertIsInstance(self.sql.albums, emmental.db.albums.Table)
self.assertIsInstance(self.sql.media, emmental.db.media.Table)
self.assertIsInstance(self.sql.genres, emmental.db.genres.Table)
self.assertIsInstance(self.sql.decades, emmental.db.decades.Table)
self.assertEqual(self.sql.albums.queue, self.sql.artists.queue)
self.assertEqual(self.sql.media.queue, self.sql.artists.queue)
@ -52,7 +53,7 @@ class TestConnection(tests.util.TestCase):
self.assertListEqual([tbl for tbl in self.sql.playlist_tables()],
[self.sql.playlists, self.sql.artists,
self.sql.albums, self.sql.media,
self.sql.genres])
self.sql.genres, self.sql.decades])
def test_load(self):
"""Check that calling load() loads the tables."""

130
tests/db/test_decades.py Normal file
View File

@ -0,0 +1,130 @@
# Copyright 2022 (c) Anna Schumaker
"""Tests our decade Gio.ListModel."""
import emmental.db
import tests.util
class TestDecadeObject(tests.util.TestCase):
"""Tests our decade object."""
def setUp(self):
"""Set up common variables."""
tests.util.TestCase.setUp(self)
self.table = self.sql.decades
self.decade = emmental.db.decades.Decade(table=self.table,
propertyid=12345,
decade=2020, name="The 2020s")
def test_init(self):
"""Test that the Decade is set up properly."""
self.assertIsInstance(self.decade, emmental.db.playlist.Playlist)
self.assertEqual(self.decade.table, self.table)
self.assertEqual(self.decade.propertyid, 12345)
self.assertEqual(self.decade.decade, 2020)
self.assertEqual(self.decade.primary_key, 2020)
self.assertEqual(self.decade.name, "The 2020s")
self.assertIsNone(self.decade.parent)
class TestDecadeTable(tests.util.TestCase):
"""Tests our decade table."""
def setUp(self):
"""Set up common variables."""
super().setUp()
self.table = self.sql.decades
def test_init(self):
"""Test that the decade table is configured correctly."""
self.assertIsInstance(self.table, emmental.db.playlist.Table)
self.assertEqual(len(self.table), 0)
def test_construct(self):
"""Test constructing a decade playlist."""
decade = self.table.construct(propertyid=1980, decade=1980,
name="The 1980s")
self.assertIsInstance(decade, emmental.db.decades.Decade)
self.assertEqual(decade.table, self.table)
self.assertEqual(decade.propertyid, 1980)
self.assertEqual(decade.decade, 1980)
self.assertEqual(decade.name, "The 1980s")
self.assertFalse(decade.active)
def test_create(self):
"""Test creating a decade playlist."""
decade = self.table.create(1988)
self.assertIsInstance(decade, emmental.db.decades.Decade)
self.assertEqual(decade.decade, 1980)
self.assertEqual(decade.name, "The 1980s")
cur = self.sql("SELECT COUNT(decade) FROM decades")
self.assertEqual(cur.fetchone()["COUNT(decade)"], 1)
row = self.sql("""SELECT COUNT(*) FROM playlist_properties
WHERE propertyid=?""", decade.propertyid).fetchone()
self.assertEqual(row["COUNT(*)"], 1)
self.assertIsNone(self.table.create(1985))
def test_delete(self):
"""Test deleting a decade playlist."""
decade = self.table.create(1980)
self.assertTrue(decade.delete())
cur = self.sql("SELECT COUNT(decade) FROM decades")
self.assertEqual(cur.fetchone()["COUNT(decade)"], 0)
self.assertEqual(len(self.table), 0)
self.assertIsNone(self.table.get_item(0))
row = self.sql("""SELECT COUNT(*) FROM playlist_properties
WHERE propertyid=?""", decade.propertyid).fetchone()
self.assertEqual(row["COUNT(*)"], 0)
self.assertFalse(decade.delete())
def test_filter(self):
"""Test filtering a decade playlist."""
self.table.create(1980)
self.table.create(1990)
self.table.filter("*80*", now=True)
self.assertSetEqual(self.table.get_filter().keys, {1980})
self.table.filter("the*s", now=True)
self.assertSetEqual(self.table.get_filter().keys, {1980, 1990})
def test_get_sort_key(self):
"""Test getting the sort key for a decade playlist."""
decade = self.table.create(1980)
self.assertEqual(self.table.get_sort_key(decade), 1980)
def test_load(self):
"""Load the decade table from the database."""
self.table.create(1980)
self.table.create(1990)
decades2 = emmental.db.decades.Table(self.sql)
self.assertEqual(len(decades2), 0)
decades2.load(now=True)
self.assertEqual(len(decades2), 2)
self.assertEqual(decades2.get_item(0).decade, 1980)
self.assertEqual(decades2.get_item(0).name, "The 1980s")
self.assertEqual(decades2.get_item(1).decade, 1990)
self.assertEqual(decades2.get_item(1).name, "The 1990s")
def test_lookup(self):
"""Test looking up decade playlists."""
decade = self.table.create(1980)
self.assertEqual(self.table.lookup(1988), decade)
self.assertIsNone(self.table.lookup(1990))
def test_update(self):
"""Test updating decade attributes."""
decade = self.table.create(1980)
decade.active = True
row = self.sql("""SELECT active FROM playlist_properties
WHERE propertyid=?""", decade.propertyid).fetchone()
self.assertEqual(row["active"], True)