175 lines
6.2 KiB
Python
175 lines
6.2 KiB
Python
# Copyright 2020 (c) Anna Schumaker.
|
|
import db
|
|
import lib
|
|
import pathlib
|
|
import tagdb
|
|
import unittest
|
|
|
|
test_tracks = pathlib.Path("./data/Test Album")
|
|
|
|
class TestLibraryTag(unittest.TestCase):
|
|
def tearDown(self):
|
|
db.NewDatabase = True
|
|
tagdb.reset()
|
|
|
|
def test_library_tag_init(self):
|
|
library = tagdb.LibraryTag(test_tracks)
|
|
self.assertIsInstance(library, lib.tag.Tag)
|
|
self.assertIsInstance(library.clear, lib.thread.Thread)
|
|
self.assertIsInstance(library.scan, lib.thread.Thread)
|
|
|
|
def test_library_tag_state(self):
|
|
lib = tagdb.LibraryTag(test_tracks)
|
|
lib.scan().join()
|
|
|
|
state = lib.__getstate__()
|
|
self.assertEqual(set(state.keys()),
|
|
set([ "name", "sort", "current", "loop", "random", "tracks" ]))
|
|
self.assertEqual(state["name"], test_tracks)
|
|
self.assertEqual(state["tracks"], [ i for i in range(12) ])
|
|
|
|
lib.__dict__.clear()
|
|
lib.__setstate__(state)
|
|
lib.fix_tracks()
|
|
self.assertEqual(lib.name, test_tracks)
|
|
self.assertEqual(lib.tracks, [ tagdb.Tracks[i] for i in range(12) ])
|
|
self.assertEqual(lib.clear.func, lib.__do_clear__)
|
|
self.assertEqual(lib.scan.func, lib.__do_scan__)
|
|
|
|
def test_library_tag_scan(self):
|
|
lib = tagdb.LibraryTag(test_tracks)
|
|
lib.scan().join()
|
|
for i in range(12):
|
|
self.assertEqual(lib.tracks[i], tagdb.Tracks[i])
|
|
|
|
def test_library_tag_scan_new(self):
|
|
lib = tagdb.LibraryTag(test_tracks)
|
|
lib.scan().join()
|
|
for trak in [ tagdb.Tracks[0], tagdb.Tracks[11] ]:
|
|
lib.remove_track(trak)
|
|
tagdb.Tracks.remove(trak)
|
|
lib.scan().join()
|
|
self.assertEqual(len(lib.tracks), 12)
|
|
|
|
def test_library_tag_scan_remove(self):
|
|
lib = tagdb.LibraryTag(test_tracks)
|
|
lib.scan().join()
|
|
|
|
trak = tagdb.track.Track(tagdb.Tracks.nextid,
|
|
test_tracks / "01 - Test Track.ogg", lib)
|
|
trak.path = pathlib.Path("No Such File")
|
|
lib.tracks.append(trak)
|
|
tagdb.Tracks.tracks[trak.trackid] = trak
|
|
lib.scan().join()
|
|
self.assertNotIn(trak, lib.tracks)
|
|
|
|
def test_library_tag_clear(self):
|
|
lib = tagdb.LibraryTag(test_tracks)
|
|
lib.scan().join()
|
|
self.assertEqual(len(lib), 12)
|
|
|
|
lib.clear().join()
|
|
self.assertEqual(len(lib), 0)
|
|
self.assertEqual(len(tagdb.Tracks), 0)
|
|
|
|
|
|
class TestLibraryStore(unittest.TestCase):
|
|
def test_library_store(self):
|
|
store = tagdb.LibraryStore()
|
|
lib = store.add(test_tracks)
|
|
self.assertIsInstance(lib, tagdb.LibraryTag)
|
|
|
|
lib.scan().join()
|
|
self.assertEqual(len(lib), 12)
|
|
|
|
store.remove(lib)
|
|
lib.clear.join()
|
|
self.assertEqual(len(lib), 0)
|
|
|
|
|
|
class TestTrackDB(unittest.TestCase):
|
|
def tearDown(self):
|
|
tagdb.reset()
|
|
|
|
def test_tagdb_init(self):
|
|
self.assertIsInstance(tagdb.Bus, lib.bus.Bus)
|
|
self.assertIsInstance(tagdb.Tracks, tagdb.allocator.TrackAllocator)
|
|
self.assertIsInstance(tagdb.Stack, tagdb.stack.TagStack)
|
|
self.assertIsInstance(tagdb.Library, tagdb.LibraryStore)
|
|
|
|
self.assertEqual(tagdb.File, "tagdb.pickle")
|
|
self.assertEqual(tagdb.Bus.timeout, 500)
|
|
|
|
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Library.Removed.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Tracks.Removed.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Tracks.Updated.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.tags.User.Added.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.tags.User.Removed.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Stack.PushPop.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Stack.NextTrack.subscribers)
|
|
|
|
def test_tagdb_save_load(self):
|
|
db_file = lib.data.DataFile(tagdb.File, lib.data.READ)
|
|
library = tagdb.Library.add(test_tracks)
|
|
library.scan()
|
|
tagdb.Stack.push(library)
|
|
|
|
tagdb.save()
|
|
self.assertEqual(tagdb.Bus.passengers, [ (tagdb._do_save, ()) ])
|
|
self.assertFalse(db_file.exists())
|
|
|
|
library.scan.join()
|
|
tagdb.Bus.complete()
|
|
self.assertTrue(db_file.exists())
|
|
|
|
db.reset()
|
|
tagdb.tags.reset()
|
|
tagdb.Library.reset()
|
|
tagdb.Tracks.reset()
|
|
tagdb.Stack.reset()
|
|
tagdb.load()
|
|
|
|
self.assertEqual(len(tagdb.Tracks), 12)
|
|
self.assertEqual(len(tagdb.Library), 1)
|
|
self.assertEqual(len(tagdb.Library[test_tracks]), 12)
|
|
self.assertEqual(len(tagdb.tags.Artist), 3)
|
|
self.assertEqual(len(tagdb.tags.Album), 12)
|
|
self.assertEqual(len(tagdb.tags.Genre), 4)
|
|
self.assertEqual(len(tagdb.tags.Decade), 2)
|
|
self.assertEqual(len(tagdb.tags.Year), 2)
|
|
self.assertEqual(tagdb.Stack.tags, [ tagdb.Library[test_tracks] ])
|
|
|
|
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.tags.User.Added.subscribers)
|
|
|
|
def test_tagdb_stress(self):
|
|
lib = tagdb.Library.add(pathlib.Path("./trier/Test Library/"))
|
|
lib.scan().join()
|
|
tagdb.Bus.complete()
|
|
|
|
tagdb.Library.remove(lib)
|
|
lib.clear.join()
|
|
tagdb.Bus.complete()
|
|
|
|
def test_tagdb_reset(self):
|
|
tagdb.Tracks.Added.register(1)
|
|
tagdb.Tracks.Removed.register(1)
|
|
tagdb.Tracks.Updated.register(1)
|
|
tagdb.Library.Added.register(1)
|
|
tagdb.Library.Removed.register(1)
|
|
tagdb.Library.store = { "a" : 1, "b" : 2, "c" : 3 }
|
|
with lib.data.DataFile(tagdb.File, lib.data.WRITE) as f:
|
|
f.pickle([ 0, [] ])
|
|
|
|
tagdb.reset()
|
|
self.assertEqual(len(tagdb.Library), 0)
|
|
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Tracks.Removed.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Tracks.Updated.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
|
|
self.assertIn(tagdb.save, tagdb.Library.Removed.subscribers)
|
|
self.assertFalse(lib.data.DataFile(tagdb.File, lib.data.READ).exists())
|