trackdb: Move LibraryPaths into the __init__.py file

And rename to LibraryTag

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2021-06-26 09:19:12 -04:00
parent 85727d786b
commit 7d4e92faa8
6 changed files with 131 additions and 158 deletions

View File

@ -2,6 +2,8 @@
from lib import bus
from lib import data
from lib import publisher
from lib import tag
from lib import thread
import pathlib
import threading
import trackdb
@ -9,12 +11,69 @@ import unittest
test_tracks = pathlib.Path("./trier/Test Album")
class TestLibraryTag(unittest.TestCase):
def setUp(self):
trackdb.reset()
def tearDown(self):
trackdb.reset()
def test_library_path_init(self):
lib = trackdb.LibraryTag(test_tracks)
self.assertIsInstance(lib, tag.Tag)
self.assertEqual(lib.name, test_tracks)
self.assertIsInstance(lib.scan, thread.Thread)
def test_library_path_state(self):
lib = trackdb.LibraryTag(test_tracks)
lib.scan().join()
state = lib.__getstate__()
self.assertEqual(set(state.keys()),
set([ "name", "sort", "current", "loop", "random", "tracks" ]))
self.assertEqual(state["name"], test_tracks)
self.assertEqual(state["tracks"], [ i for i in range(12) ])
lib.__dict__.clear()
lib.__setstate__(state)
self.assertEqual(lib.name, test_tracks)
self.assertEqual(lib.tracks, [ trackdb.Tracks[i] for i in range(12) ])
self.assertEqual(lib.clear.func, lib.__do_clear__)
self.assertEqual(lib.scan.func, lib.__do_scan__)
def test_library_path_scan(self):
lib = trackdb.LibraryTag(test_tracks)
lib.scan().join()
self.assertEqual(len(lib.tracks), 12)
for trak in [ trackdb.Tracks[0], trackdb.Tracks[11] ]:
lib.remove_track(trak)
trackdb.Tracks.remove(trak)
lib.scan().join()
self.assertEqual(len(lib.tracks), 12)
trak = trackdb.track.Track(trackdb.Tracks.nextid, test_tracks / "01 - Test Track.ogg", lib)
trak.path = pathlib.Path("No Such File")
lib.tracks.append(trak)
trackdb.Tracks.tracks[trak.trackid] = trak
lib.scan().join()
self.assertEqual(len(lib.tracks), 12)
def test_library_path_clear(self):
lib = trackdb.LibraryTag(test_tracks)
lib.scan().join()
self.assertEqual(len(lib), 12)
lib.clear().join()
self.assertEqual(len(lib), 0)
self.assertEqual(len(trackdb.Tracks), 0)
class TestLibraryStore(unittest.TestCase):
def test_library_store(self):
store = trackdb.LibraryStore()
lib = store.add(test_tracks)
self.assertIsInstance(lib, trackdb.library.LibraryPath)
self.assertIsInstance(lib, trackdb.LibraryTag)
lib.scan().join()
self.assertEqual(len(lib), 12)
@ -48,13 +107,13 @@ class TestTrackDB(unittest.TestCase):
self.assertIn(trackdb.save, trackdb.Library.Added.subscribers)
self.assertIn(trackdb.save, trackdb.Library.Removed.subscribers)
self.assertIn(trackdb.save, trackdb.library.Tracks.Added.subscribers)
self.assertIn(trackdb.save, trackdb.library.Tracks.Removed.subscribers)
self.assertIn(trackdb.save, trackdb.library.Tracks.Updated.subscribers)
self.assertIn(trackdb.save, trackdb.Tracks.Added.subscribers)
self.assertIn(trackdb.save, trackdb.Tracks.Removed.subscribers)
self.assertIn(trackdb.save, trackdb.Tracks.Updated.subscribers)
def test_trackdb_add_path(self):
lib = trackdb.Library.add(test_tracks)
self.assertIsInstance(lib, trackdb.library.LibraryPath)
self.assertIsInstance(lib, trackdb.LibraryTag)
self.assertEqual(self.count_added, 1)
self.assertEqual(self.count_removed, 0)
@ -84,7 +143,7 @@ class TestTrackDB(unittest.TestCase):
trackdb.tags.reset()
trackdb.Library.reset()
trackdb.load()
self.assertEqual(len(trackdb.library.Tracks), 12)
self.assertEqual(len(trackdb.Tracks), 12)
self.assertEqual(len(trackdb.Library), 1)
self.assertEqual(len(trackdb.Library[test_tracks]), 12)
self.assertEqual(len(trackdb.tags.Artist), 3)
@ -99,9 +158,9 @@ class TestTrackDB(unittest.TestCase):
trackdb.save_bus.complete()
def test_trackdb_reset(self):
trackdb.library.Tracks.Added.register(1)
trackdb.library.Tracks.Removed.register(1)
trackdb.library.Tracks.Updated.register(1)
trackdb.Tracks.Added.register(1)
trackdb.Tracks.Removed.register(1)
trackdb.Tracks.Updated.register(1)
trackdb.Library.Added.register(1)
trackdb.Library.Removed.register(1)
trackdb.Library.store = { "a" : 1, "b" : 2, "c" : 3 }
@ -111,9 +170,9 @@ class TestTrackDB(unittest.TestCase):
trackdb.reset()
self.assertEqual(len(trackdb.Library), 0)
self.assertIn(trackdb.save, trackdb.library.Tracks.Added.subscribers)
self.assertIn(trackdb.save, trackdb.library.Tracks.Removed.subscribers)
self.assertIn(trackdb.save, trackdb.library.Tracks.Updated.subscribers)
self.assertIn(trackdb.save, trackdb.Tracks.Added.subscribers)
self.assertIn(trackdb.save, trackdb.Tracks.Removed.subscribers)
self.assertIn(trackdb.save, trackdb.Tracks.Updated.subscribers)
self.assertIn(trackdb.save, trackdb.Library.Added.subscribers)
self.assertIn(trackdb.save, trackdb.Library.Removed.subscribers)
self.assertFalse(data.DataFile(trackdb.db_file, data.READ).exists())

View File

@ -2,8 +2,10 @@
from lib import bus
from lib import data
from lib import publisher
from lib import tag
from lib import tagstore
from . import library
from lib import thread
from . import allocator
from . import tags
from . import track
import pathlib
@ -11,11 +13,39 @@ import threading
db_file = "trackdb.pickle"
save_bus = bus.Bus(500)
Tracks = allocator.TrackAllocator()
class LibraryTag(tag.Tag):
def __init__(self, path):
super().__init__(path)
self.clear = thread.Thread(self.__do_clear__)
self.scan = thread.Thread(self.__do_scan__)
def __setstate__(self, state):
super().__setstate__(state)
self.tracks = [ Tracks[i] for i in state["tracks"] ]
self.clear = thread.Thread(self.__do_clear__)
self.scan = thread.Thread(self.__do_scan__)
def __do_scan__(self):
for trak in Tracks.autoremove(self):
self.remove_track(trak)
track_set = set([ t.filepath() for t in Tracks.list_tracks(self) ])
for f in self.name.rglob("*"):
if f not in track_set and f.is_file():
if (track := Tracks.allocate(self, f)) != None:
self.add_track(track)
def __do_clear__(self):
for trak in self.tracks:
Tracks.remove(trak)
self.tracks.clear()
class LibraryStore(tagstore.TagStore):
def __alloc_tag__(self, name, sort):
return library.LibraryPath(name)
return LibraryTag(name)
def add(self, name):
return super().__add_tag__(name, None, None)
@ -29,29 +59,30 @@ Library = LibraryStore()
def _do_save():
with data.DataFile(db_file, data.WRITE) as f:
f.pickle([ tags.Artist, tags.Album, tags.Genre, tags.Decade, tags.Year, library.Tracks, Library ])
f.pickle([ tags.Artist, tags.Album, tags.Genre, tags.Decade, tags.Year, Tracks, Library ])
def save(*args):
save_bus.board(_do_save)
def load():
global Library
global Tracks
with data.DataFile(db_file, data.READ) as f:
if f.exists():
(tags.Artist, tags.Album, tags.Genre, tags.Decade, tags.Year, library.Tracks, Library) = f.unpickle()
(tags.Artist, tags.Album, tags.Genre, tags.Decade, tags.Year, Tracks, Library) = f.unpickle()
def __register_callbacks():
Library.Added.register(save)
Library.Removed.register(save)
library.Tracks.Added.register(save)
library.Tracks.Removed.register(save)
library.Tracks.Updated.register(save)
Tracks.Added.register(save)
Tracks.Removed.register(save)
Tracks.Updated.register(save)
__register_callbacks()
def reset():
Tracks.reset()
Library.reset()
tags.reset()
library.reset()
save_bus.clear()
data.DataFile(db_file, data.READ).remove()

View File

@ -1,42 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
from lib import publisher
from lib import tag
from lib import tagstore
from lib import thread
from . import allocator
from . import track
import pathlib
import threading
Tracks = allocator.TrackAllocator()
class LibraryPath(tag.Tag):
def __init__(self, path):
super().__init__(path)
self.clear = thread.Thread(self.__do_clear__)
self.scan = thread.Thread(self.__do_scan__)
def __setstate__(self, state):
super().__setstate__(state)
self.tracks = [ Tracks[i] for i in state["tracks"] ]
self.clear = thread.Thread(self.__do_clear__)
self.scan = thread.Thread(self.__do_scan__)
def __do_scan__(self):
for trak in Tracks.autoremove(self):
self.remove_track(trak)
track_set = set([ t.filepath() for t in Tracks.list_tracks(self) ])
for f in self.name.rglob("*"):
if f not in track_set and f.is_file():
if (track := Tracks.allocate(self, f)) != None:
self.add_track(track)
def __do_clear__(self):
for trak in self.tracks:
Tracks.remove(trak)
self.tracks.clear()
def reset():
Tracks.reset()

View File

@ -1,15 +1,19 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import publisher
from . import allocator
from . import library
import pathlib
import threading
import unittest
test_tracks = pathlib.Path("./trier/Test Album")
class FakeLibrary:
def __init__(self):
self.name = test_tracks
class TestTrackAllocator(unittest.TestCase):
def setUp(self):
self.lib = FakeLibrary()
self.added = None
self.removed = None
self.updated = None
@ -33,12 +37,11 @@ class TestTrackAllocator(unittest.TestCase):
self.assertIsInstance(alloc.Updated, publisher.Publisher)
def test_allocator(self):
lib = library.LibraryPath(test_tracks)
alloc = allocator.TrackAllocator()
alloc.Added.register(self.on_track_added)
alloc.Removed.register(self.on_track_removed)
track = alloc.allocate(lib, test_tracks / "01 - Test Track.ogg")
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
self.assertEqual(alloc.tracks[0], track)
self.assertEqual(alloc[0], track)
self.assertEqual(len(alloc), 1)
@ -46,16 +49,16 @@ class TestTrackAllocator(unittest.TestCase):
self.assertEqual(track.trackid, 0)
self.assertEqual(self.added, track)
track2 = alloc.allocate(lib, test_tracks / "02 - Test {Disc 2}.ogg")
track2 = alloc.allocate(self.lib, test_tracks / "02 - Test {Disc 2}.ogg")
self.assertEqual(alloc.tracks[1], track2)
self.assertEqual(alloc[1], track2)
self.assertEqual(len(alloc), 2)
self.assertIsNone(alloc.allocate(lib, test_tracks / "No Such File"))
self.assertIsNone(alloc.allocate(self.lib, test_tracks / "No Such File"))
self.assertEqual(self.added, track2)
self.assertEqual([ t for t in alloc.list_tracks(lib) ], [ track, track2 ])
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track, track2 ])
track.library = None
self.assertEqual([ t for t in alloc.list_tracks(lib) ], [ track2 ])
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track2 ])
alloc.remove(track)
self.assertEqual(alloc[0], None)
@ -70,31 +73,28 @@ class TestTrackAllocator(unittest.TestCase):
self.assertEqual(alloc.Removed.subscribers, set())
def test_allocator_autoremove(self):
lib = library.LibraryPath(test_tracks)
alloc = allocator.TrackAllocator()
alloc.Removed.register(self.on_track_removed)
track = alloc.allocate(lib, test_tracks / "01 - Test Track.ogg")
track2 = alloc.allocate(lib, test_tracks / "02 - Test {Disc 2}.ogg")
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
track2 = alloc.allocate(self.lib, test_tracks / "02 - Test {Disc 2}.ogg")
track2.path = pathlib.Path("No Such File")
self.assertEqual(alloc.autoremove(lib), [ track2 ])
self.assertEqual([ t for t in alloc.list_tracks(lib) ], [ track ])
self.assertEqual(alloc.autoremove(self.lib), [ track2 ])
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track ])
def test_allocator_played(self):
lib = library.LibraryPath(test_tracks)
alloc = allocator.TrackAllocator()
alloc.Updated.register(self.on_track_updated)
track = alloc.allocate(lib, test_tracks / "01 - Test Track.ogg")
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
alloc.played(track)
self.assertEqual(track.playcount, 1)
self.assertEqual(self.updated, track)
def test_allocator_state(self):
lib = library.LibraryPath(test_tracks)
alloc = allocator.TrackAllocator()
track = alloc.allocate(lib, test_tracks / "01 - Test Track.ogg")
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
state = alloc.__getstate__()
self.assertEqual(state, { "tracks" : { 0 : track },

View File

@ -1,78 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
from lib import publisher
from lib import tag
from lib import thread
from . import allocator
from . import library
from . import tags
from . import track
import pathlib
import threading
import unittest
test_tracks = pathlib.Path("./trier/Test Album")
class TestLibraryPath(unittest.TestCase):
def setUp(self):
library.reset()
def tearDown(self):
tags.reset()
library.reset()
def test_library_init(self):
self.assertIsInstance(library.Tracks, allocator.TrackAllocator)
def test_library_path_init(self):
lib = library.LibraryPath(test_tracks)
self.assertIsInstance(lib, tag.Tag)
self.assertEqual(lib.name, test_tracks)
self.assertIsInstance(lib.scan, thread.Thread)
def test_library_path_state(self):
lib = library.LibraryPath(test_tracks)
lib.scan().join()
state = lib.__getstate__()
self.assertEqual(set(state.keys()),
set([ "name", "sort", "current", "loop", "random", "tracks" ]))
self.assertEqual(state["name"], test_tracks)
self.assertEqual(state["tracks"], [ i for i in range(12) ])
lib.name = None
lib.clear = None
lib.scan = None
lib.tracks.clear()
lib.__setstate__(state)
self.assertEqual(lib.name, test_tracks)
self.assertEqual(lib.tracks, [ library.Tracks[i] for i in range(12) ])
self.assertEqual(lib.clear.func, lib.__do_clear__)
self.assertEqual(lib.scan.func, lib.__do_scan__)
def test_library_path_scan(self):
lib = library.LibraryPath(test_tracks)
lib.scan().join()
self.assertEqual(len(lib.tracks), 12)
for trak in [ library.Tracks[0], library.Tracks[11] ]:
lib.remove_track(trak)
library.Tracks.remove(trak)
lib.scan().join()
self.assertEqual(len(lib.tracks), 12)
trak = track.Track(library.Tracks.nextid, test_tracks / "01 - Test Track.ogg", lib)
trak.path = pathlib.Path("No Such File")
lib.tracks.append(trak)
library.Tracks.tracks[trak.trackid] = trak
lib.scan().join()
self.assertEqual(len(lib.tracks), 12)
def test_library_path_clear(self):
lib = library.LibraryPath(test_tracks)
lib.scan().join()
self.assertEqual(len(lib), 12)
lib.clear().join()
self.assertEqual(len(lib), 0)
self.assertEqual(len(library.Tracks), 0)

View File

@ -1,6 +1,5 @@
# Copyright 2020 (c) Anna Schumaker.
from lib import publisher
from . import library
from . import tags
from . import track
import datetime
@ -9,10 +8,14 @@ import unittest
test_tracks = pathlib.Path("./trier/Test Album")
class FakeLibrary:
def __init__(self):
self.name = test_tracks
class TestTrack(unittest.TestCase):
def setUp(self):
tags.reset()
self.lib = library.LibraryPath(test_tracks)
self.lib = FakeLibrary()
def tearDown(self):
tags.reset()