db: Convert the TrackTable into a table.Table

Implements: Issue #11 (Cache database items fields)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2021-10-10 16:22:13 -04:00
parent 4235e794bd
commit 063b93b66f
11 changed files with 139 additions and 168 deletions

View File

@ -26,15 +26,11 @@ def make_fake_track(trackno, length, title, path, lib="/a/b/c", art="Test Artist
length, title, pathlib.Path(path))
def reset():
mods = [ state, user, artist, album, disc, genre, decade, year, library ]
mods = [ track, state, user, artist, album,
disc, genre, decade, year, library ]
for mod in mods: mod.Table.reset()
mods = [ track ]
for mod in mods: mod.Table.drop()
for mod in mods: mod.Table.do_create()
genre.Map.reset()
user.Map.reset()
user.TempMap.reset()

View File

@ -57,7 +57,7 @@ class GenreTable(playlist.Model):
class GenreMap(objects.Map):
def __init__(self):
objects.Map.__init__(self, "genre_map", Table.get, track.Track)
objects.Map.__init__(self, "genre_map", Table.get, track.Table.get)
self.lookup_tracks = self.lookup_rhs
self.lookup_genres = self.lookup_lhs
@ -71,11 +71,11 @@ class GenreMap(objects.Map):
def do_insert(self, genre, track):
sql.execute("INSERT INTO genre_map (genreid, trackid) "
"VALUES (?, ?)", [ genre.rowid, int(track) ])
"VALUES (?, ?)", [ genre.rowid, track.rowid ])
def do_delete(self, genre, track):
return sql.execute("DELETE FROM genre_map WHERE genreid=? AND trackid=?",
[ genre.rowid, int(track) ])
[ genre.rowid, track.rowid ])
def do_lookup_rhs(self, genre):
return sql.execute("SELECT trackid FROM genre_map "
@ -83,7 +83,7 @@ class GenreMap(objects.Map):
def do_lookup_lhs(self, track):
return sql.execute("SELECT genreid FROM genre_map "
"WHERE trackid=?", [ int(track) ])
"WHERE trackid=?", [ track.rowid ])
def delete_track(self, track):
for genre in self.lookup_genres(track):

View File

@ -35,7 +35,7 @@ class Library(playlist.Playlist):
def tracks(self):
cursor = sql.execute(f"SELECT trackid FROM tracks "
"WHERE libraryid=?", [ self.rowid ])
return [ track.Track(row["trackid"]) for row in cursor.fetchall() ]
return [ track.Table.get(row["trackid"]) for row in cursor.fetchall() ]

View File

@ -118,8 +118,8 @@ class Map(GObject.GObject):
execute(f"DROP TABLE {self.map_name}")
self.do_create()
@GObject.Signal(arg_types=(GObject.GObject,Row))
@GObject.Signal(arg_types=(GObject.GObject,GObject.GObject))
def row_inserted(self, lhs, rhs): pass
@GObject.Signal(arg_types=(GObject.GObject,Row))
@GObject.Signal(arg_types=(GObject.GObject,GObject.GObject))
def row_deleted(self, lhs, rhs): pass

View File

@ -52,7 +52,7 @@ class TestGenreMap(unittest.TestCase):
def test_genre_map_init(self):
self.assertIsInstance(db.genre.Map, db.genre.GenreMap)
self.assertEqual(db.genre.Map.map_lhs, db.genre.Table.get)
self.assertEqual(db.genre.Map.map_rhs, db.track.Track)
self.assertEqual(db.genre.Map.map_rhs, db.track.Table.get)
db.sql.execute("SELECT genreid,trackid FROM genre_map")
def test_genre_map_insert(self):

View File

@ -6,91 +6,65 @@ import sqlite3
import unittest
from gi.repository import GObject
class TestTrack(unittest.TestCase):
def setUp(self):
db.reset()
def test_init(self):
track = db.make_fake_track(1, 1.234, "Test Title", "/a/b/c/1.ogg")
self.assertIsInstance(track.library, db.library.Library)
self.assertIsInstance(track.artist, db.artist.Artist)
self.assertIsInstance(track.album, db.album.Album)
self.assertIsInstance(track.disc, db.disc.Disc)
self.assertIsInstance(track.year, db.year.Year)
self.assertEqual(track.number, 1)
self.assertEqual(track.playcount, 0)
self.assertIsNone(track.lastplayed, None)
self.assertEqual(track.length, 1.234)
self.assertEqual(track.title, "Test Title")
def test_played(self):
track = db.make_fake_track(1, 1.234, "Test Title", "/a/b/c/1.ogg")
track.played()
self.assertEqual(track.playcount, 1)
self.assertEqual(track.lastplayed.date(), datetime.date.today())
class TestTrackTable(unittest.TestCase):
def setUp(self):
db.reset()
def test_track_table_init(self):
def test_init(self):
table = db.track.TrackTable()
self.assertIsInstance(table, db.table.Table)
self.assertEqual(table.table, "tracks")
self.assertIsInstance(db.track.Table, db.track.TrackTable)
db.sql.execute("SELECT trackid,libraryid,artistid,albumid,discid,decadeid,yearid FROM tracks")
db.sql.execute("SELECT number,playcount,lastplayed,length,title,path FROM tracks")
def test_track_table_insert(self):
def test_insert(self):
library = db.library.Table.find(pathlib.Path("/a/b/c"))
artist = db.artist.Table.find("Test Artist", "test artist")
album = artist.find_album("Test Album")
disc = album.find_disc(1, None)
decade = db.decade.Table.find(2020)
year = decade.find_year(2021)
track = db.track.Table.insert(library, artist, album, disc, decade, year,
1, 1.234, "Test Title", pathlib.Path("/a/b/c/d.efg"))
track = db.track.Table.insert(library, artist, album, disc, decade,
year, 1, 1.234, "Test Title",
pathlib.Path("/a/b/c/d.efg"))
self.assertIsInstance(track, db.track.Track)
self.assertIsInstance(track, db.objects.Row)
self.assertEqual(track.library, library)
self.assertEqual(track.artist, artist)
self.assertEqual(track.album, album)
self.assertEqual(track.disc, disc)
self.assertEqual(track.decade, decade)
self.assertEqual(track.year, year)
self.assertEqual(track.number, 1)
self.assertEqual(track.playcount, 0)
self.assertEqual(track.lastplayed, None)
self.assertEqual(track.length, 1.234)
self.assertEqual(track.title, "Test Title")
self.assertEqual(track.path, pathlib.Path("/a/b/c/d.efg"))
with self.assertRaises(sqlite3.IntegrityError):
db.track.Table.insert(library, artist, album, disc, decade, year,
1, 1.234, "Test Title", pathlib.Path("/a/b/c/d.efg"))
def test_track_table_delete(self):
track = db.make_fake_track(1, 1.234, "Test Title", "/a/b/c/d.efg")
genre = db.genre.Table.find("Test Genre")
playlist = db.user.Table.find("Test Playlist")
db.genre.Map.insert(genre, track)
db.user.Map.insert(playlist, track)
db.user.TempMap.insert(playlist, track)
db.track.Table.delete(track)
self.assertIsNone(db.track.Table.lookup(pathlib.Path("/a/b/c/d.efg")))
self.assertEqual(db.genre.Map.lookup_tracks(genre), [ ])
self.assertEqual(db.user.Map.lookup_tracks(playlist), [ ])
self.assertEqual(db.user.TempMap.lookup_tracks(playlist), [ ])
def test_track_table_get(self):
track = db.make_fake_track(1, 1.234, "Test Title", "/a/b/c/d.efg")
self.assertEqual(db.track.Table.get(1), track)
self.assertIsNone(db.track.Table.get(2))
def test_track_table_lookup(self):
def test_lookup(self):
track = db.make_fake_track(1, 1.234, "Test Title", "/a/b/c/d.efg")
self.assertEqual(db.track.Table.lookup(pathlib.Path("/a/b/c/d.efg")), track)
self.assertIsNone(db.library.Table.lookup(pathlib.Path("/a/b/d/h.ijk")))
def test_track_table_find(self):
def test_find(self):
with self.assertRaises(NotImplementedError):
db.track.Table.find(pathlib.Path("/a/b/c/d.efg"))
def test_track_played(self):
track = db.make_fake_track(1, 1.234, "Test Title", "/a/b/c/d.efg")
now = datetime.datetime.now()
yesterday = datetime.date.today() - datetime.timedelta(days=1)
track.played()
self.assertEqual(track.playcount, 1)
self.assertEqual(track.lastplayed.replace(microsecond=0),
now.replace(microsecond=0))
track.last_played(4, yesterday)
self.assertEqual(track.playcount, 4)
self.assertEqual(track.lastplayed.replace(microsecond=0),
datetime.datetime.combine(yesterday, datetime.time()))
track.last_played(5, now)
self.assertEqual(track.playcount, 5)
self.assertEqual(track.lastplayed, now)

View File

@ -89,7 +89,7 @@ class TestPlaylistMap(unittest.TestCase):
self.assertIsInstance(db.user.TempMap, db.user.PlaylistMap)
self.assertEqual(db.user.Map.map_lhs, db.user.Table.get)
self.assertEqual(db.user.Map.map_rhs, db.track.Track)
self.assertEqual(db.user.Map.map_rhs, db.track.Table.get)
self.assertFalse(db.user.Map.temporary)
self.assertTrue( db.user.TempMap.temporary)

View File

@ -14,134 +14,126 @@
import datetime
import pathlib
from gi.repository import GObject
from .sql import commit
from .sql import execute
from . import artist
from . import album
from . import decade
from . import disc
from . import library
from . import objects
from . import sql
from . import table
from . import year
class Track(objects.Row):
def do_get_column(self, column):
return execute(f"SELECT {column} FROM tracks "
"WHERE trackid=?", [ self.rowid ])
class Track(GObject.GObject):
def __init__(self, row):
GObject.GObject.__init__(self)
self._trackid = row["trackid"]
self._library = library.Table.get(row["libraryid"])
self._artist = artist.Table.get(row["artistid"])
self._album = album.Table.get(row["albumid"])
self._disc = disc.Table.get(row["discid"])
self._year = year.Table.get(row["yearid"])
self._number = row["number"]
self._playcount = row["playcount"]
self._lastplayed = row["lastplayed"]
self._length = row["length"]
self._title = row["title"]
self._path = pathlib.Path(row["path"])
@GObject.Property
def library(self):
return library.Table.get(self.get_column("libraryid"))
def rowid(self): return self._trackid
@GObject.Property
def artist(self):
return artist.Table.get(self.get_column("artistid"))
def library(self): return self._library
@GObject.Property
def album(self):
return album.Table.get(self.get_column("albumid"))
def artist(self): return self._artist
@GObject.Property
def disc(self):
return disc.Table.get(self.get_column("discid"))
def album(self): return self._album
@GObject.Property
def decade(self):
return decade.Table.get(self.get_column("decadeid"))
def disc(self): return self._disc
@GObject.Property
def year(self):
return year.Table.get(self.get_column("yearid"))
def year(self): return self._year
@GObject.Property
def number(self):
return self.get_column("number")
def number(self): return self._number
@GObject.Property
def playcount(self):
return self.get_column("playcount")
def playcount(self): return self._playcount
@playcount.setter
def playcount(self, newval):
self._playcount = self.update("playcount", newval)
@GObject.Property
def lastplayed(self):
return self.get_column("lastplayed")
def lastplayed(self): return self._lastplayed
@lastplayed.setter
def lastplayed(self, newval):
self._lastplayed = self.update("lastplayed", newval)
@GObject.Property
def length(self):
return self.get_column("length")
def length(self): return self._length
@GObject.Property
def title(self):
return self.get_column("title")
def title(self): return self._title
@GObject.Property
def path(self):
return pathlib.Path(self.get_column("path"))
def path(self): return self._path
def update(self, column, newval):
sql.execute(f"UPDATE tracks SET {column}=? WHERE trackid=?",
[ newval, self.rowid ])
return newval
def played(self):
execute("UPDATE tracks "
"SET playcount=playcount+1, lastplayed=? "
"WHERE trackid=?", [ datetime.datetime.now(), self.rowid ])
commit()
def last_played(self, playcount, lastplayed):
if not isinstance(lastplayed, datetime.datetime):
timestamp = datetime.datetime.combine(lastplayed, datetime.time())
else:
timestamp = lastplayed
execute("UPDATE tracks "
"SET playcount=?, lastplayed=? "
"WHERE trackid=?", [ playcount, timestamp, self.rowid ])
commit()
self.playcount += 1
self.lastplayed = datetime.datetime.now()
sql.commit()
class TrackTable(objects.Table):
class TrackTable(table.Table):
def __init__(self):
objects.Table.__init__(self, "tracks", Track)
table.Table.__init__(self, "tracks")
def do_create(self):
execute("CREATE TABLE IF NOT EXISTS tracks "
"(trackid INTEGER PRIMARY KEY, "
" libraryid INTEGER, "
" artistid INTEGER, "
" albumid INTEGER, "
" discid INTEGER, "
" decadeid INTEGER, "
" yearid INTEGER, "
" number INTEGER, "
" playcount INTEGER DEFAULT 0,"
" lastplayed TIMESTAMP DEFAULT NULL, "
" length REAL, "
" title TEXT, "
" path TEXT UNIQUE, "
" FOREIGN KEY(libraryid) REFERENCES libraries(libraryid), "
" FOREIGN KEY(artistid) REFERENCES artists(artistid), "
" FOREIGN KEY(albumid) REFERENCES albums(albumid), "
" FOREIGN KEY(discid) REFERENCES discs(discid), "
" FOREIGN KEY(yearid) REFERENCES years(yearid))")
execute("CREATE INDEX IF NOT EXISTS track_index ON tracks(path)")
sql.execute("CREATE TABLE IF NOT EXISTS tracks "
"(trackid INTEGER PRIMARY KEY, "
" libraryid INTEGER, "
" artistid INTEGER, "
" albumid INTEGER, "
" discid INTEGER, "
" decadeid INTEGER, "
" yearid INTEGER, "
" number INTEGER, "
" playcount INTEGER DEFAULT 0,"
" lastplayed TIMESTAMP DEFAULT NULL, "
" length REAL, "
" title TEXT, "
" path TEXT UNIQUE, "
" FOREIGN KEY(libraryid) REFERENCES libraries(libraryid), "
" FOREIGN KEY(artistid) REFERENCES artists(artistid), "
" FOREIGN KEY(albumid) REFERENCES albums(albumid), "
" FOREIGN KEY(discid) REFERENCES discs(discid), "
" FOREIGN KEY(yearid) REFERENCES years(yearid))")
sql.execute("CREATE INDEX IF NOT EXISTS track_index ON tracks(path)")
def do_insert(self, library, artist, album, disc, decade, year, number, length, title, path):
return execute("INSERT INTO tracks (libraryid, artistid, albumid, discid, "
"decadeid, yearid, number, length, title, path) "
"VALUES (?,?,?,?,?,?,?,?,?,?)",
[ library.rowid, artist.rowid, album.rowid, disc.rowid,
decade.rowid, year.rowid, number, length, title, str(path) ])
def do_factory(self, row):
return Track(row)
def do_delete(self, track):
from . import genre
from . import user
genre.Map.delete_track(track)
user.Map.delete_track(track)
user.TempMap.delete_track(track)
return execute("DELETE FROM tracks WHERE trackid=?", [ int(track) ])
def do_get(self, rowid):
return execute("SELECT trackid FROM tracks "
"WHERE trackid=?", [ rowid ])
def do_insert(self, library, artist, album, disc, decade,
year, number, length, title, path):
return sql.execute("INSERT INTO tracks (libraryid, artistid, albumid, "
"discid, decadeid, yearid, "
"number, length, title, path) "
"VALUES (?,?,?,?,?,?,?,?,?,?)",
[ library.rowid, artist.rowid, album.rowid, disc.rowid,
decade.rowid, year.rowid, number, length, title, str(path) ])
def do_lookup(self, path):
return execute("SELECT trackid FROM tracks "
"WHERE path=?", [ str(path) ])
return sql.execute("SELECT * FROM tracks WHERE path=?", [ str(path) ])
def find(self, *args):
raise NotImplementedError

View File

@ -64,7 +64,7 @@ class PlaylistMap(objects.Map):
name = "playlist_map" if temp==False else "temp_playlist_map"
self.temporary = temp
objects.Map.__init__(self, name, Table.get, track.Track)
objects.Map.__init__(self, name, Table.get, track.Table.get)
self.lookup_tracks = self.lookup_rhs
self.lookup_playlists = self.lookup_lhs
@ -79,12 +79,12 @@ class PlaylistMap(objects.Map):
def do_insert(self, playlist, track):
sql.execute(f"INSERT INTO {self.map_name} (playlistid, trackid) "
"VALUES (?, ?)", [ playlist.rowid, int(track) ])
"VALUES (?, ?)", [ playlist.rowid, track.rowid ])
def do_delete(self, playlist, track):
return sql.execute(f"DELETE FROM {self.map_name} "
"WHERE playlistid=? AND trackid=?",
[ playlist.rowid, int(track) ])
[ playlist.rowid, track.rowid ])
def do_lookup_rhs(self, playlist):
return sql.execute(f"SELECT trackid FROM {self.map_name} "
@ -92,7 +92,7 @@ class PlaylistMap(objects.Map):
def do_lookup_lhs(self, track):
return sql.execute(f"SELECT playlistid FROM {self.map_name} "
"WHERE trackid=?", [ int(track) ])
"WHERE trackid=?", [ track.rowid ])
def delete_track(self, track):
for playlist in self.lookup_playlists(track):

View File

@ -1,5 +1,6 @@
# Copyright 2021 (c) Anna Schumaker.
import db
import datetime
from gi.repository import GObject
from . import metadata
@ -40,12 +41,19 @@ class ImportTask(FileTask):
def __init__(self, library, filepath, playcount, lastplayed):
FileTask.__init__(self, library, filepath)
self.playcount = playcount
self.lastplayed = lastplayed
if isinstance(lastplayed, datetime.datetime):
self.lastplayed = lastplayed
elif isinstance(lastplayed, datetime.date):
self.lastplayed = datetime.datetime.combine(lastplayed,
datetime.time())
def run_task(self):
FileTask.run_task(self)
if self.playcount == 0:
return
if track := db.track.Table.lookup(self.filepath):
track.last_played(self.playcount, self.lastplayed)
track.playcount = self.playcount
track.lastplayed = self.lastplayed
class DirectoryTask(Task):

View File

@ -77,7 +77,8 @@ class TestScannerImportTask(unittest.TestCase):
self.assertEqual(it.library, lib)
self.assertEqual(it.filepath, test_track01)
self.assertEqual(it.playcount, 4)
self.assertEqual(it.lastplayed, today)
self.assertEqual(it.lastplayed,
datetime.datetime.combine(today, datetime.time()))
self.assertIsNone(it.run_task())