From 915e3c834060cc827d0a08b11a7bb9f77489fdfd Mon Sep 17 00:00:00 2001 From: Anna Schumaker Date: Sun, 26 Dec 2021 10:03:37 -0500 Subject: [PATCH] Remove unused tagdb module Implements #23 (Remove tagdb/ code) Signed-off-by: Anna Schumaker --- audio/__init__.py | 1 - audio/selector.py | 13 --- audio/test_selector.py | 11 --- emmental.py | 2 - tagdb/__init__.py | 111 ------------------------ tagdb/allocator.py | 85 ------------------ tagdb/stack.py | 75 ---------------- tagdb/tags.py | 36 -------- tagdb/test_allocator.py | 110 ------------------------ tagdb/test_stack.py | 186 ---------------------------------------- tagdb/test_tagdb.py | 174 ------------------------------------- tagdb/test_tags.py | 50 ----------- tagdb/test_track.py | 173 ------------------------------------- tagdb/test_user.py | 99 --------------------- tagdb/track.py | 98 --------------------- tagdb/user.py | 70 --------------- ui/keyboard.py | 1 - 17 files changed, 1295 deletions(-) delete mode 100644 audio/selector.py delete mode 100644 audio/test_selector.py delete mode 100644 tagdb/__init__.py delete mode 100644 tagdb/allocator.py delete mode 100644 tagdb/stack.py delete mode 100644 tagdb/tags.py delete mode 100644 tagdb/test_allocator.py delete mode 100644 tagdb/test_stack.py delete mode 100644 tagdb/test_tagdb.py delete mode 100644 tagdb/test_tags.py delete mode 100644 tagdb/test_track.py delete mode 100644 tagdb/test_user.py delete mode 100644 tagdb/track.py delete mode 100644 tagdb/user.py diff --git a/audio/__init__.py b/audio/__init__.py index 79c3aba..0486f64 100644 --- a/audio/__init__.py +++ b/audio/__init__.py @@ -1,5 +1,4 @@ # Copyright 2021 (c) Anna Schumaker. -import tagdb from gi.repository import Gtk from . import artwork from . import controls diff --git a/audio/selector.py b/audio/selector.py deleted file mode 100644 index 919a4f5..0000000 --- a/audio/selector.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2021 (c) Anna Schumaker. -import tagdb -from gi.repository import GObject - -class Selector(GObject.GObject): - def __init__(self): GObject.GObject.__init__(self) - def next(self): return None - def previous(self): return None - - -class TagdbSelector(Selector): - def next(self): return tagdb.Stack.next()[0] - def previous(self): return tagdb.Stack.previous() diff --git a/audio/test_selector.py b/audio/test_selector.py deleted file mode 100644 index 3f77e99..0000000 --- a/audio/test_selector.py +++ /dev/null @@ -1,11 +0,0 @@ -# Copyright 2021 (c) Anna Schumaker. -import unittest -from gi.repository import GObject -from . import selector - -class TestAudioSelector(unittest.TestCase): - def test_audio_selector_init(self): - select = selector.Selector() - self.assertIsInstance(select, GObject.GObject) - self.assertIsNone(select.next()) - self.assertIsNone(select.previous()) diff --git a/emmental.py b/emmental.py index a609b3d..826fe71 100755 --- a/emmental.py +++ b/emmental.py @@ -1,9 +1,7 @@ #!/usr/bin/python # Copyright 2021 (c) Anna Schumaker. import lib -import tagdb lib.settings.load() -tagdb.load() import ui ui.Application.run() diff --git a/tagdb/__init__.py b/tagdb/__init__.py deleted file mode 100644 index c9a05c1..0000000 --- a/tagdb/__init__.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright 2020 (c) Anna Schumaker. -from . import allocator -from . import stack -from . import tags -from . import track -import db -import lib -import pathlib -import scanner -import threading - -File = "tagdb.pickle" -Bus = lib.bus.Bus(500) -Tracks = allocator.TrackAllocator() -Stack = stack.TagStack() - -class LibraryTag(lib.tag.Tag): - def __init__(self, path): - super().__init__(path) - self.clear = lib.thread.Thread(self.__do_clear__) - self.scan = lib.thread.Thread(self.__do_scan__) - - def __setstate__(self, state): - super().__setstate__(state) - self.clear = lib.thread.Thread(self.__do_clear__) - self.scan = lib.thread.Thread(self.__do_scan__) - - def __do_scan__(self): - for trak in Tracks.autoremove(self): - self.remove_track(trak) - - track_set = set([ t.filepath() for t in Tracks.list_tracks(self) ]) - for f in self.name.rglob("*"): - if f not in track_set and f.is_file(): - if (track := Tracks.allocate(self, f)) != None: - self.add_track(track) - - def __do_clear__(self): - for trak in self.tracks: - Tracks.remove(trak) - self.tracks.clear() - - def fix_tracks(self): - for (index, trak) in enumerate(self.tracks): - t = Tracks[trak] - if t is not None: - self.tracks[index] = t - - - -class LibraryStore(lib.tagstore.TagStore): - def __alloc_tag__(self, name, sort): - return LibraryTag(name) - - def add(self, name): - return super().__add_tag__(name, None, None) - - def remove(self, lib): - lib.clear() - super().remove(lib) - - def fix_tracks(self): - for (id, tag) in self.store.items(): - tag.fix_tracks() - -Library = LibraryStore() - - -def _do_save(): - try: - with lib.data.DataFile(File, lib.data.WRITE) as f: - f.pickle([ tags.get_state(), Tracks, Library, Stack ]) - except Exception as e: - return lib.bus.RETRY - -def save(*args): - Bus.board(_do_save) - -def load(): - global Library - global Tracks - global Stack - if not db.new_db(): - return - with lib.data.DataFile(File, lib.data.READ) as f: - if f.exists(): - (tagstate, Tracks, Library, Stack) = f.unpickle() - tags.set_state(*tagstate) - Tracks.load_tags() - Library.fix_tracks() - __register_callbacks() - scanner.Queue.push(scanner.task.CommitTask()) - -def __register_callbacks(): - for store in [ Library, tags.User, Tracks ]: - store.Added.register(save) - store.Removed.register(save) - Tracks.Updated.register(save) - Stack.PushPop.register(save) - Stack.NextTrack.register(save) -__register_callbacks() - -def reset(): - Tracks.reset() - Library.reset() - Stack.reset() - tags.reset() - Bus.clear() - - lib.data.DataFile(File, lib.data.READ).remove() - __register_callbacks() diff --git a/tagdb/allocator.py b/tagdb/allocator.py deleted file mode 100644 index f4c7d74..0000000 --- a/tagdb/allocator.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright 2021 (c) Anna Schumaker. -from lib import publisher -from . import track -import threading - -class TrackAllocator: - def __init__(self): - self.tracks = dict() - self.nextid = 0 - self.lock = threading.Lock() - self.Added = publisher.Publisher() - self.Removed = publisher.Publisher() - self.Updated = publisher.Publisher() - - def __alloc_track__(self, lib, filepath): - with self.lock: - trak = track.Track(self.nextid, filepath, lib) - self.tracks[self.nextid] = trak - self.nextid += 1 - self.Added.publish(trak) - return trak - - def __getitem__(self, id): - with self.lock: - return self.tracks.get(id, None) - - def __getstate__(self): - with self.lock: - return { "tracks" : self.tracks, - "nextid" : self.nextid } - - def __len__(self): - with self.lock: - return len(self.tracks) - - def __setstate__(self, state): - self.__dict__.update(state) - self.lock = threading.Lock() - self.Added = publisher.Publisher() - self.Removed = publisher.Publisher() - self.Updated = publisher.Publisher() - - def allocate(self, lib, filepath): - try: - return self.__alloc_track__(lib, filepath) - except Exception as e: - pass - - def autoremove(self, lib): - with self.lock: - to_rm = [ trak for trak in self.tracks.values() \ - if trak.library == lib and not trak.filepath().exists() ] - for trak in to_rm: - trak.about_to_remove() - del self.tracks[trak.trackid] - self.Removed.publish(trak) - return to_rm - - def list_tracks(self, lib): - with self.lock: - for (id, track) in self.tracks.items(): - if track.library ==lib: - yield track - - def load_tags(self): - for (id, track) in self.tracks.items(): - track.__set_tags__() - - def played(self, track): - with self.lock: - track.played() - self.Updated.publish(track) - - def remove(self, track): - with self.lock: - track.about_to_remove() - del self.tracks[track.trackid] - self.Removed.publish(track) - - def reset(self): - with self.lock: - self.nextid = 0 - self.tracks.clear() - self.Added.reset() - self.Removed.reset() diff --git a/tagdb/stack.py b/tagdb/stack.py deleted file mode 100644 index fef94b7..0000000 --- a/tagdb/stack.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright 2021 (c) Anna Schumaker. -from lib import counter -from lib import publisher -from . import tags - -class TagStack: - def __init__(self): - self.tags = [ ] - self.Counter = counter.Counter(-1, 99) - self.PushPop = publisher.Publisher() - self.NextTrack = publisher.Publisher() - - def __do_next__(self, tag): - if track := tag.next(): - track.add_to_playlist("Previous") - return track - - def __getstate__(self): - return { "tags" : self.tags } - - def __setstate__(self, state): - self.__dict__.update(state) - self.Counter = counter.Counter(-1, 99) - self.PushPop = publisher.Publisher() - self.NextTrack = publisher.Publisher() - - def current(self): - if len(self.tags) == 0: - return tags.User.store["Collection"] - return self.tags[0] - - def __next_track__(self): - if len(self.tags) == 0: - return self.__do_next__(tags.User["Collection"]) - if track := self.__do_next__(self.tags[0]): - return track - self.pop() - return self.__next_track__() - - def next(self): - ret = self.__next_track__() - count = self.Counter.decrement() - self.NextTrack.publish(ret) - return (ret, count != -1) - - def pop(self): - prev = self.tags.pop(0) - self.PushPop.publish(prev, self.current()) - - def previous(self): - return tags.User["Previous"].next() - - def push(self, tag): - prev = self.current() - if tag == tags.User["Previous"]: - return - if tag == tags.User["Collection"]: - self.tags.clear() - self.PushPop.publish(prev, tags.User["Collection"]) - return - if tag in self.tags: - self.tags.remove(tag) - - self.tags.insert(0, tag) - tag.stacked() - self.PushPop.publish(prev, tag) - - def queue(self, track): - track.add_to_playlist("Up Next") - self.push(tags.User["Up Next"]) - - def reset(self): - self.tags.clear() - self.count = None - self.PushPop.reset() diff --git a/tagdb/tags.py b/tagdb/tags.py deleted file mode 100644 index 9493403..0000000 --- a/tagdb/tags.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright 2021 (c) Anna Schumaker. -from . import user -from lib import tagstore - -Artist = tagstore.TagStore() -Album = tagstore.TagSuperStore() -Genre = tagstore.TagStore() -Decade = tagstore.TagStore() -Year = tagstore.TagSuperStore() -User = user.UserTagStore() - -def get_state(): - return (Artist, Album, Genre, Decade, Year, User) - -def set_state(artist, album, genre, decade, year, user): - global Artist - global Album - global Genre - global Decade - global Year - global User - - Artist = artist - Album = album - Genre = genre - Decade = decade - Year = year - User = user - -def reset(): - Artist.reset() - Album.reset() - Genre.reset() - Decade.reset() - Year.reset() - User.reset() diff --git a/tagdb/test_allocator.py b/tagdb/test_allocator.py deleted file mode 100644 index 0344f34..0000000 --- a/tagdb/test_allocator.py +++ /dev/null @@ -1,110 +0,0 @@ -# Copyright 2021 (c) Anna Schumaker. -from lib import publisher -from . import allocator -import pathlib -import threading -import unittest - -test_tracks = pathlib.Path("./data/Test Album") - -class FakeLibrary: - def __init__(self): - self.name = test_tracks - -class TestTrackAllocator(unittest.TestCase): - def setUp(self): - self.lib = FakeLibrary() - self.added = None - self.removed = None - self.updated = None - - def on_track_added(self, track): - self.added = track - - def on_track_removed(self, track): - self.removed = track - - def on_track_updated(self, track): - self.updated = track - - def test_allocator_init(self): - alloc = allocator.TrackAllocator() - self.assertEqual(alloc.tracks, { }) - self.assertEqual(alloc.nextid, 0) - self.assertIsInstance(alloc.lock, type(threading.Lock())) - self.assertIsInstance(alloc.Added, publisher.Publisher) - self.assertIsInstance(alloc.Removed, publisher.Publisher) - self.assertIsInstance(alloc.Updated, publisher.Publisher) - - def test_allocator(self): - alloc = allocator.TrackAllocator() - alloc.Added.register(self.on_track_added) - alloc.Removed.register(self.on_track_removed) - - track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg") - self.assertEqual(alloc.tracks[0], track) - self.assertEqual(alloc[0], track) - self.assertEqual(len(alloc), 1) - self.assertEqual(alloc.nextid, 1) - self.assertEqual(track.trackid, 0) - self.assertEqual(self.added, track) - - track2 = alloc.allocate(self.lib, test_tracks / "02 - Test {Disc 2}.ogg") - self.assertEqual(alloc.tracks[1], track2) - self.assertEqual(alloc[1], track2) - self.assertEqual(len(alloc), 2) - - self.assertIsNone(alloc.allocate(self.lib, test_tracks / "No Such File")) - self.assertEqual(self.added, track2) - self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track, track2 ]) - track.library = None - self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track2 ]) - - alloc.remove(track) - self.assertEqual(alloc[0], None) - self.assertEqual(alloc.tracks, { 1 : track2 }) - self.assertEqual(alloc.nextid, 2) - self.assertEqual(self.removed, track) - - alloc.reset() - self.assertEqual(alloc.nextid, 0) - self.assertEqual(alloc.tracks, { }) - self.assertEqual(alloc.Added.subscribers, set()) - self.assertEqual(alloc.Removed.subscribers, set()) - - def test_allocator_autoremove(self): - alloc = allocator.TrackAllocator() - alloc.Removed.register(self.on_track_removed) - - track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg") - track2 = alloc.allocate(self.lib, test_tracks / "02 - Test {Disc 2}.ogg") - track2.path = pathlib.Path("No Such File") - - self.assertEqual(alloc.autoremove(self.lib), [ track2 ]) - self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track ]) - - def test_allocator_played(self): - alloc = allocator.TrackAllocator() - alloc.Updated.register(self.on_track_updated) - track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg") - - alloc.played(track) - self.assertEqual(track.playcount, 1) - self.assertEqual(self.updated, track) - - def test_allocator_state(self): - alloc = allocator.TrackAllocator() - track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg") - - state = alloc.__getstate__() - self.assertEqual(state, { "tracks" : { 0 : track }, - "nextid" : 1 }) - - alloc.__dict__.clear() - alloc.__setstate__(state) - self.assertEqual(alloc.tracks, { 0 : track }) - self.assertEqual(alloc.nextid, 1) - self.assertIsInstance(alloc.lock, type(threading.Lock())) - self.assertIsInstance(alloc.Added, publisher.Publisher) - self.assertIsInstance(alloc.Removed, publisher.Publisher) - self.assertIsInstance(alloc.Updated, publisher.Publisher) diff --git a/tagdb/test_stack.py b/tagdb/test_stack.py deleted file mode 100644 index 82cf128..0000000 --- a/tagdb/test_stack.py +++ /dev/null @@ -1,186 +0,0 @@ -# Copyright 2021 (c) Anna Schumaker. -from lib import counter -from lib import publisher -from lib import tag -from . import stack -from . import tags -import unittest - -class FakeTrack: - def __init__(self, n): - self.n = n - self.length = n - - def add_to_playlist(self, name): - tags.User.add(name, self) - - def remove_from_playlist(self, name): - tags.User[name].remove_track(self) - -class TestTagStack(unittest.TestCase): - def setUp(self): - self.pushpop = None - self.next_track = None - - def tearDown(self): - tags.reset() - - def on_next_track(self, track): - self.next_track = track - - def on_push_pop(self, prev, new): - self.pushpop = (prev, new) - - def test_tag_stack_init(self): - s = stack.TagStack() - self.assertIsInstance(s.Counter, counter.Counter) - self.assertIsInstance(s.PushPop, publisher.Publisher) - self.assertIsInstance(s.NextTrack, publisher.Publisher) - self.assertEqual(s.tags, [ ]) - - def test_tag_stack_next(self): - s = stack.TagStack() - t = tag.Tag("Test") - s.NextTrack.register(self.on_next_track) - s.push(t) - - t.tracks = [ FakeTrack(1), FakeTrack(2), FakeTrack(3) ] - tags.User["Collection"].tracks = [ FakeTrack(4), FakeTrack(5) ] - - (res, cont) = s.next() - self.assertEqual( (res.n, cont), (1, True) ) - self.assertEqual(self.next_track.n, 1) - self.assertEqual(s.tags, [ t ]) - - (res, cont) = s.next() - self.assertEqual( (res.n, cont), (2, True) ) - self.assertEqual(self.next_track.n, 2) - self.assertEqual(s.tags, [ t ]) - - (res, cont) = s.next() - self.assertEqual( (res.n, cont), (3, True) ) - self.assertEqual(self.next_track.n, 3) - self.assertEqual(s.tags, [ t ]) - - (res, cont) = s.next() - self.assertEqual( (res.n, cont), (4, True) ) - self.assertEqual(self.next_track.n, 4) - self.assertEqual(s.tags, [ ]) - - (res, cont) = s.next() - self.assertEqual( (res.n, cont), (5, True) ) - self.assertEqual(self.next_track.n, 5) - - self.assertEqual([ t.n for t in tags.User["Previous"].tracks ], - [ 5, 4, 3, 2, 1 ]) - - def test_tag_stack_autopause(self): - s = stack.TagStack() - t = tag.Tag("Test") - s.push(t) - - t.tracks = [ FakeTrack(1), FakeTrack(2), FakeTrack(3), FakeTrack(4) ] - s.Counter.set_value(2) - - (res, cont) = s.next() - self.assertEqual( (res.n, cont), (1, True) ) - self.assertEqual(s.tags, [ t ]) - self.assertEqual(s.Counter.get_value(), 1) - - (res, cont) = s.next() - self.assertEqual( (res.n, cont), (2, True) ) - self.assertEqual(s.tags, [ t ]) - self.assertEqual(s.Counter.get_value(), 0) - - (res, cont) = s.next() - self.assertEqual( (res.n, cont), (3, False) ) - self.assertEqual(s.tags, [ t ]) - self.assertEqual(s.Counter.get_value(), -1) - - (res, cont) = s.next() - self.assertEqual( (res.n, cont), (4, True) ) - self.assertEqual(s.tags, [ t ]) - self.assertEqual(s.Counter.get_value(), -1) - - def test_tag_stack_pop(self): - s = stack.TagStack() - t1 = tag.Tag("Test") - t2 = tag.Tag("Test Two") - s.tags = [ t1, t2 ] - s.PushPop.register(self.on_push_pop) - - s.pop() - self.assertEqual(s.tags, [ t2 ]) - self.assertEqual(self.pushpop, (t1, t2)) - - s.pop() - self.assertEqual(s.tags, [ ]) - self.assertEqual(self.pushpop, (t2, tags.User["Collection"])) - - def test_tag_stack_previous(self): - s = stack.TagStack() - for i in [ 1, 2, 3 ]: - tags.User["Previous"].add_track(i) - - self.assertEqual(s.previous(), 2) - self.assertEqual(s.previous(), 1) - self.assertIsNone(s.previous()) - - def test_tag_stack_push(self): - s = stack.TagStack() - t1 = tag.Tag("Test") - t2 = tag.Tag("Test Two") - t1.current = 3 - s.PushPop.register(self.on_push_pop) - - s.push(t1) - self.assertEqual(s.tags, [ t1 ]) - self.assertEqual(t1.current, -1) - self.assertEqual(self.pushpop, (tags.User["Collection"], t1)) - - s.push(t2) - self.assertEqual(s.tags, [ t2, t1 ]) - self.assertEqual(self.pushpop, (t1, t2)) - - s.push(t1) - self.assertEqual(s.tags, [ t1, t2 ]) - self.assertEqual(self.pushpop, (t2, t1)) - - s.push(tags.User["Previous"]) - self.assertEqual(s.tags, [ t1, t2 ]) - s.push(tags.User["Collection"]) - self.assertEqual(s.tags, [ ]) - self.assertEqual(self.pushpop, (t1, tags.User["Collection"])) - - def test_tag_stack_queue(self): - s = stack.TagStack() - s.queue(FakeTrack(1)) - self.assertEqual(s.tags, [ tags.User["Up Next"] ]) - - (res, cont) = s.next() - self.assertEqual( (res.n, cont), (1, True) ) - self.assertEqual(tags.User["Up Next"].tracks, [ ]) - - def test_tag_stack_state(self): - s = stack.TagStack() - t = tag.Tag("Test") - s.push(t) - - state = s.__getstate__() - self.assertEqual(state, { "tags" : [ t ] }) - - s.__dict__.clear() - s.__setstate__(state) - self.assertEqual(s.tags, [ t ]) - self.assertIsInstance(s.Counter, counter.Counter) - self.assertIsInstance(s.PushPop, publisher.Publisher) - self.assertIsInstance(s.NextTrack, publisher.Publisher) - - s.PushPop.register(self.on_push_pop) - s.PushPop.register(self.on_next_track) - s.count = 3 - s.reset() - self.assertEqual(s.tags, [ ]) - self.assertIsInstance(s.Counter, counter.Counter) - self.assertEqual(len(s.PushPop.subscribers), 0) - self.assertEqual(len(s.NextTrack.subscribers), 0) diff --git a/tagdb/test_tagdb.py b/tagdb/test_tagdb.py deleted file mode 100644 index dbda4e4..0000000 --- a/tagdb/test_tagdb.py +++ /dev/null @@ -1,174 +0,0 @@ -# Copyright 2020 (c) Anna Schumaker. -import db -import lib -import pathlib -import tagdb -import unittest - -test_tracks = pathlib.Path("./data/Test Album") - -class TestLibraryTag(unittest.TestCase): - def tearDown(self): - db.NewDatabase = True - tagdb.reset() - - def test_library_tag_init(self): - library = tagdb.LibraryTag(test_tracks) - self.assertIsInstance(library, lib.tag.Tag) - self.assertIsInstance(library.clear, lib.thread.Thread) - self.assertIsInstance(library.scan, lib.thread.Thread) - - def test_library_tag_state(self): - lib = tagdb.LibraryTag(test_tracks) - lib.scan().join() - - state = lib.__getstate__() - self.assertEqual(set(state.keys()), - set([ "name", "sort", "current", "loop", "random", "tracks" ])) - self.assertEqual(state["name"], test_tracks) - self.assertEqual(state["tracks"], [ i for i in range(12) ]) - - lib.__dict__.clear() - lib.__setstate__(state) - lib.fix_tracks() - self.assertEqual(lib.name, test_tracks) - self.assertEqual(lib.tracks, [ tagdb.Tracks[i] for i in range(12) ]) - self.assertEqual(lib.clear.func, lib.__do_clear__) - self.assertEqual(lib.scan.func, lib.__do_scan__) - - def test_library_tag_scan(self): - lib = tagdb.LibraryTag(test_tracks) - lib.scan().join() - for i in range(12): - self.assertEqual(lib.tracks[i], tagdb.Tracks[i]) - - def test_library_tag_scan_new(self): - lib = tagdb.LibraryTag(test_tracks) - lib.scan().join() - for trak in [ tagdb.Tracks[0], tagdb.Tracks[11] ]: - lib.remove_track(trak) - tagdb.Tracks.remove(trak) - lib.scan().join() - self.assertEqual(len(lib.tracks), 12) - - def test_library_tag_scan_remove(self): - lib = tagdb.LibraryTag(test_tracks) - lib.scan().join() - - trak = tagdb.track.Track(tagdb.Tracks.nextid, - test_tracks / "01 - Test Track.ogg", lib) - trak.path = pathlib.Path("No Such File") - lib.tracks.append(trak) - tagdb.Tracks.tracks[trak.trackid] = trak - lib.scan().join() - self.assertNotIn(trak, lib.tracks) - - def test_library_tag_clear(self): - lib = tagdb.LibraryTag(test_tracks) - lib.scan().join() - self.assertEqual(len(lib), 12) - - lib.clear().join() - self.assertEqual(len(lib), 0) - self.assertEqual(len(tagdb.Tracks), 0) - - -class TestLibraryStore(unittest.TestCase): - def test_library_store(self): - store = tagdb.LibraryStore() - lib = store.add(test_tracks) - self.assertIsInstance(lib, tagdb.LibraryTag) - - lib.scan().join() - self.assertEqual(len(lib), 12) - - store.remove(lib) - lib.clear.join() - self.assertEqual(len(lib), 0) - - -class TestTrackDB(unittest.TestCase): - def tearDown(self): - tagdb.reset() - - def test_tagdb_init(self): - self.assertIsInstance(tagdb.Bus, lib.bus.Bus) - self.assertIsInstance(tagdb.Tracks, tagdb.allocator.TrackAllocator) - self.assertIsInstance(tagdb.Stack, tagdb.stack.TagStack) - self.assertIsInstance(tagdb.Library, tagdb.LibraryStore) - - self.assertEqual(tagdb.File, "tagdb.pickle") - self.assertEqual(tagdb.Bus.timeout, 500) - - self.assertIn(tagdb.save, tagdb.Library.Added.subscribers) - self.assertIn(tagdb.save, tagdb.Library.Removed.subscribers) - self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers) - self.assertIn(tagdb.save, tagdb.Tracks.Removed.subscribers) - self.assertIn(tagdb.save, tagdb.Tracks.Updated.subscribers) - self.assertIn(tagdb.save, tagdb.tags.User.Added.subscribers) - self.assertIn(tagdb.save, tagdb.tags.User.Removed.subscribers) - self.assertIn(tagdb.save, tagdb.Stack.PushPop.subscribers) - self.assertIn(tagdb.save, tagdb.Stack.NextTrack.subscribers) - - def test_tagdb_save_load(self): - db_file = lib.data.DataFile(tagdb.File, lib.data.READ) - library = tagdb.Library.add(test_tracks) - library.scan() - tagdb.Stack.push(library) - - tagdb.save() - self.assertEqual(tagdb.Bus.passengers, [ (tagdb._do_save, ()) ]) - self.assertFalse(db_file.exists()) - - library.scan.join() - tagdb.Bus.complete() - self.assertTrue(db_file.exists()) - - db.reset() - tagdb.tags.reset() - tagdb.Library.reset() - tagdb.Tracks.reset() - tagdb.Stack.reset() - tagdb.load() - - self.assertEqual(len(tagdb.Tracks), 12) - self.assertEqual(len(tagdb.Library), 1) - self.assertEqual(len(tagdb.Library[test_tracks]), 12) - self.assertEqual(len(tagdb.tags.Artist), 3) - self.assertEqual(len(tagdb.tags.Album), 12) - self.assertEqual(len(tagdb.tags.Genre), 4) - self.assertEqual(len(tagdb.tags.Decade), 2) - self.assertEqual(len(tagdb.tags.Year), 2) - self.assertEqual(tagdb.Stack.tags, [ tagdb.Library[test_tracks] ]) - - self.assertIn(tagdb.save, tagdb.Library.Added.subscribers) - self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers) - self.assertIn(tagdb.save, tagdb.tags.User.Added.subscribers) - - def test_tagdb_stress(self): - lib = tagdb.Library.add(pathlib.Path("./trier/Test Library/")) - lib.scan().join() - tagdb.Bus.complete() - - tagdb.Library.remove(lib) - lib.clear.join() - tagdb.Bus.complete() - - def test_tagdb_reset(self): - tagdb.Tracks.Added.register(1) - tagdb.Tracks.Removed.register(1) - tagdb.Tracks.Updated.register(1) - tagdb.Library.Added.register(1) - tagdb.Library.Removed.register(1) - tagdb.Library.store = { "a" : 1, "b" : 2, "c" : 3 } - with lib.data.DataFile(tagdb.File, lib.data.WRITE) as f: - f.pickle([ 0, [] ]) - - tagdb.reset() - self.assertEqual(len(tagdb.Library), 0) - self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers) - self.assertIn(tagdb.save, tagdb.Tracks.Removed.subscribers) - self.assertIn(tagdb.save, tagdb.Tracks.Updated.subscribers) - self.assertIn(tagdb.save, tagdb.Library.Added.subscribers) - self.assertIn(tagdb.save, tagdb.Library.Removed.subscribers) - self.assertFalse(lib.data.DataFile(tagdb.File, lib.data.READ).exists()) diff --git a/tagdb/test_tags.py b/tagdb/test_tags.py deleted file mode 100644 index 8215e07..0000000 --- a/tagdb/test_tags.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright 2021 (c) Anna Schumaker. -from lib import tagstore -from . import tags -from . import user -import unittest - -class TestTags(unittest.TestCase): - def tearDown(self): - tags.reset() - - def test_tags_init(self): - self.assertIsInstance(tags.Artist, tagstore.TagStore) - self.assertIsInstance(tags.Album, tagstore.TagSuperStore) - self.assertIsInstance(tags.Genre, tagstore.TagStore) - self.assertIsInstance(tags.Decade, tagstore.TagStore) - self.assertIsInstance(tags.Year, tagstore.TagSuperStore) - self.assertIsInstance(tags.User, user.UserTagStore) - - def test_tags_reset(self): - tags.Artist.store = {"a" : 1 } - tags.Album.store = {("a", "b") : 2 } - tags.Genre.store = {"c" : 3 } - tags.Decade.store = {"d" : 4 } - tags.User.store = {"e" : 5 } - - tags.reset() - self.assertEqual(tags.Artist.store, { }) - self.assertEqual(tags.Album.store, { }) - self.assertEqual(tags.Genre.store, { }) - self.assertEqual(tags.Decade.store, { }) - self.assertIsNotNone(tags.User["Collection"]) - self.assertIsNotNone(tags.User["Up Next"]) - self.assertIsNotNone(tags.User["Previous"]) - self.assertIsNotNone(tags.User["Favorites"]) - self.assertIsNotNone(tags.User["Up Next"]) - - def test_tags_state(self): - state = tags.get_state() - self.assertEqual(state, ( tags.Artist, tags.Album, tags.Genre, - tags.Decade, tags.Year, tags.User )) - - tags.set_state(*(1, 2, 3, 4, 5, 6)) - self.assertEqual(tags.Artist, 1) - self.assertEqual(tags.Album, 2) - self.assertEqual(tags.Genre, 3) - self.assertEqual(tags.Decade, 4) - self.assertEqual(tags.Year, 5) - self.assertEqual(tags.User, 6) - - tags.set_state(*state) diff --git a/tagdb/test_track.py b/tagdb/test_track.py deleted file mode 100644 index 101caac..0000000 --- a/tagdb/test_track.py +++ /dev/null @@ -1,173 +0,0 @@ -# Copyright 2020 (c) Anna Schumaker. -from lib import publisher -from gi.repository import GObject -from . import tags -from . import track -import datetime -import pathlib -import unittest - -test_tracks = pathlib.Path("./data/Test Album") - -class FakeLibrary: - def __init__(self): - self.name = test_tracks - -class TestTrack(unittest.TestCase): - def setUp(self): - tags.reset() - self.lib = FakeLibrary() - - def tearDown(self): - tags.reset() - - def test_track_init(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - self.assertEqual(trak.trackid, 1) - self.assertEqual(trak.filepath(), test_tracks / "01 - Test Track.ogg") - self.assertIsInstance(trak, GObject.Object) - - def test_track_album(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - artist = trak.artist - self.assertEqual(trak.album, tags.Album[artist, "Test Album"]) - self.assertEqual(trak["album"], "Test Album") - - def test_track_artist(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - self.assertEqual(trak.artist, tags.Artist["Test Artist"]) - self.assertEqual(trak["artist"], "Test Artist") - self.assertEqual(trak.artist.sort, "artist, test") - - trak2 = track.Track(2, test_tracks / "02 - Test {Disc 2}.ogg", self.lib) - self.assertEqual(trak2.artist, tags.Artist["Test Album Artist"]) - self.assertEqual(trak2.artist.sort, "album artist, test") - - def test_track_decade(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - self.assertEqual(trak.decade, tags.Decade["2010s"]) - self.assertEqual(trak["decade"], "2010s") - - def test_track_discnumber(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - self.assertEqual(trak.discnumber, 1) - self.assertEqual(trak["discnumber"], "01") - - def test_track_filepath(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - self.assertEqual(trak.path, pathlib.Path("01 - Test Track.ogg")) - self.assertEqual(trak.filepath(), test_tracks / "01 - Test Track.ogg") - - def test_track_genres(self): - trak = track.Track(1, test_tracks / "02 - Test {Disc 2}.ogg", self.lib) - genlist = [ tags.Genre["Test"], tags.Genre["Genre"], tags.Genre["List"] ] - self.assertEqual(trak.genres, genlist) - self.assertEqual(trak["genres"], "Test, Genre, List") - - def test_track_length(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - self.assertEqual(trak.length, 10) - self.assertEqual(trak["length"], "0:10") - trak.length = 61 - self.assertEqual(trak["length"], "1:01") - trak.length = 3 - self.assertEqual(trak["length"], "0:03") - - def test_track_played(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - trak.playcount = 0 - trak.lastplayed = None - self.assertEqual(trak["playcount"], "0") - self.assertEqual(trak["lastplayed"], "Never") - trak.played() - self.assertEqual(trak.playcount, 1) - self.assertEqual(trak.lastplayed.date(), datetime.date.today()) - self.assertEqual(trak["playcount"], "1") - #self.assertEqual(trak["lastplayed"], str(datetime.date.today())) - trak.played() - self.assertEqual(trak.playcount, 2) - self.assertEqual(trak["playcount"], "2") - - def test_track_title(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - self.assertEqual(trak.title, "Test Track") - self.assertEqual(trak["title"], "Test Track") - - def test_track_tracknumber(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - self.assertEqual(trak.tracknumber, 1) - self.assertEqual(trak["tracknumber"], "1-01") - trak.tracknumber = 10 - self.assertEqual(trak["tracknumber"], "1-10") - - def test_track_year(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - self.assertEqual(trak.year, tags.Year[trak.decade, "2019"]) - self.assertEqual(trak["year"], "2019") - trak2 = track.Track(2, test_tracks / "02 - Test {Disc 2}.ogg", self.lib) - self.assertEqual(trak2.year, tags.Year[trak2.decade, "2019"]) - self.assertEqual(trak2["year"], "2019") - - def test_track_playlists(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - default = [ tags.User["Collection"], tags.User["New Tracks"] ] - self.assertEqual(trak.playlists, default) - - trak.add_to_playlist("Test") - self.assertEqual(trak.playlists, default + [ tags.User["Test"] ]) - - trak.remove_from_playlist("Test") - self.assertEqual(trak.playlists, default) - self.assertEqual(len(tags.User["Test"]), 0) - - def test_track_about_to_remove(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - trak.about_to_remove() - - self.assertEqual(len(tags.Artist), 0) - self.assertEqual(len(tags.Album), 0) - self.assertEqual(len(tags.Genre), 0) - self.assertEqual(len(tags.Decade), 0) - self.assertEqual(len(tags.Year), 0) - self.assertEqual(len(tags.User["Collection"]), 0) - self.assertEqual(len(tags.User["New Tracks"]), 0) - - def test_track_state(self): - trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib) - trak.add_to_playlist("Starred") - trak.add_to_playlist("Previous") - - state = trak.__getstate__() - self.assertEqual(state["artist"], "Test Artist") - self.assertEqual(state["album"], "Test Album") - self.assertEqual(state["genres"], [ "Test" ]) - self.assertEqual(state["decade"], "2010s") - self.assertEqual(state["year"], "2019") - self.assertEqual(state["playlists"], [ "Collection", "Starred" ]) - - tags.Artist["Test Artist"].tracks = [ 1 ] - tags.Album[trak.artist, "Test Album"].tracks = [ 1 ] - tags.Genre["Test"].tracks = [ 1 ] - tags.Decade["2010s"].tracks = [ 1 ] - tags.Year[trak.decade, "2019"].tracks = [ 1 ] - tags.User["Collection"].tracks = [ 1 ] - tags.User["Starred"].tracks = [ 1 ] - - trak.__dict__.clear() - trak.__setstate__(state) - trak.__set_tags__() - self.assertEqual(trak.artist, tags.Artist["Test Artist"]) - self.assertEqual(trak.album, tags.Album[trak.artist, "Test Album"]) - self.assertEqual(trak.genres, [ tags.Genre["Test"] ]) - self.assertEqual(trak.decade, tags.Decade["2010s"]) - self.assertEqual(trak.year, tags.Year[trak.decade, "2019"]) - self.assertEqual(trak.playlists, [ tags.User["Collection"], - tags.User["Starred"] ]) - - self.assertEqual(tags.Artist["Test Artist"].tracks, [ trak ]) - self.assertEqual(tags.Album[trak.artist, "Test Album"].tracks, [ trak ]) - self.assertEqual(tags.Genre["Test"].tracks, [ trak ]) - self.assertEqual(tags.Decade["2010s"].tracks, [ trak ]) - self.assertEqual(tags.Year[trak.decade, "2019"].tracks, [ trak ]) - self.assertEqual(tags.User["Collection"].tracks, [ trak ]) - self.assertEqual(tags.User["Starred"].tracks, [ trak ]) diff --git a/tagdb/test_user.py b/tagdb/test_user.py deleted file mode 100644 index fc9342a..0000000 --- a/tagdb/test_user.py +++ /dev/null @@ -1,99 +0,0 @@ -# Copyright 2021 (c) Anna Schumaker. -from lib import fake -from lib import tag -from lib import tagstore -from . import user -import unittest - -class TestUserTags(unittest.TestCase): - def test_collection_tag(self): - c = user.CollectionTag() - self.assertIsInstance(c, tag.Tag) - self.assertTrue(c.loop) - self.assertFalse(c.can_loop()) - - def test_new_tracks_tag(self): - n = user.NewTracksTag() - self.assertIsInstance(n, tag.Tag) - self.assertTrue(n.can_loop()) - self.assertTrue(n.can_random()) - - n.tracks = [ fake.Track(1), fake.Track(2), fake.Track(3) ] - state = n.__getstate__() - self.assertEqual(state["name"], "New Tracks") - self.assertEqual(state["sort"], "new tracks") - self.assertEqual(state["current"], -1) - self.assertEqual(state["tracks"], [ ]) - self.assertFalse(state["loop"]) - self.assertFalse(state["random"]) - - def test_previous_tag(self): - p = user.PreviousTag() - self.assertIsInstance(p, tag.Tag) - self.assertFalse(p.can_loop()) - self.assertFalse(p.can_random()) - - p.add_track(fake.Track(1)) - self.assertEqual(p.tracks, [ fake.Track(1) ]) - self.assertIsNone(p.next()) - - p.add_track(fake.Track(2)) - self.assertEqual(p.tracks, [ fake.Track(2), fake.Track(1) ]) - self.assertEqual(p.next(), fake.Track(1)) - - p.add_track(fake.Track(3)) - self.assertEqual(p.tracks, [ fake.Track(3), fake.Track(2), fake.Track(1) ]) - self.assertEqual(p.next(), fake.Track(2)) - self.assertEqual(p.next(), fake.Track(1)) - - state = p.__getstate__() - self.assertEqual(state["name"], "Previous") - self.assertEqual(state["sort"], "previous") - self.assertEqual(state["current"], 2) - self.assertEqual(state["tracks"], [ ]) - self.assertFalse(state["loop"]) - self.assertFalse(state["random"]) - - def test_up_next_tag(self): - u = user.UpNextTag() - self.assertIsInstance(u, tag.Tag) - self.assertFalse(u.loop) - self.assertFalse(u.can_loop()) - self.assertIsNone(u.next()) - - u.tracks = [ fake.Track(1, u), fake.Track(2, u), fake.Track(3, u) ] - self.assertEqual(u.next(), fake.Track(1, u)) - self.assertEqual(u.tracks, [ fake.Track(2, u), fake.Track(3, u) ]) - - self.assertEqual(u.next(), fake.Track(2, u)) - self.assertEqual(u.tracks, [ fake.Track(3, u) ]) - - u.random = True - self.assertEqual(u.next(), fake.Track(3, u)) - self.assertEqual(u.tracks, [ ]) - self.assertFalse(u.random) - - -class TestUserTagStore(unittest.TestCase): - def test_user_init(self): - store = user.UserTagStore() - self.assertIsInstance(store, tagstore.TagStore) - - self.assertIsInstance(store["Collection"], user.CollectionTag) - self.assertIsInstance(store["New Tracks"], user.NewTracksTag) - self.assertIsInstance(store["Previous"], user.PreviousTag) - self.assertIsInstance(store["Favorites"], tag.Tag) - self.assertIsInstance(store["Up Next"], user.UpNextTag) - - def test_user_reset(self): - store = user.UserTagStore() - store.add("Playlist") - self.assertEqual(len(store), 6) - - store.reset() - self.assertEqual(len(store), 5) - self.assertIsInstance(store["Collection"], user.CollectionTag) - self.assertIsInstance(store["New Tracks"], user.NewTracksTag) - self.assertIsInstance(store["Previous"], user.PreviousTag) - self.assertIsInstance(store["Favorites"], tag.Tag) - self.assertIsInstance(store["Up Next"], user.UpNextTag) diff --git a/tagdb/track.py b/tagdb/track.py deleted file mode 100644 index 6bf3bb4..0000000 --- a/tagdb/track.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright 2020 (c) Anna Schumaker. -from lib import metadata -from lib import publisher -from . import tags -from gi.repository import GObject -import datetime -import db -import scanner - -class Track(GObject.Object): - def __init__(self, trackid, filepath, library): - GObject.Object.__init__(self) - self.trackid = trackid - self.path = filepath.relative_to(library.name) - self.library = library - self.lastplayed = None - self.playcount = 0 - - with metadata.Metadata(filepath) as meta: - self.title = meta.title() - self.length = meta.length() - self.discnumber = meta.discnumber() - self.tracknumber = meta.tracknumber() - - self.artist = tags.Artist.add(meta.artist(), self, sort=meta.artistsort()) - self.album = tags.Album.add(self.artist, meta.album(), self) - self.genres = [ tags.Genre.add(g, self) for g in meta.genres() ] - self.decade = tags.Decade.add(f"{meta.decade()}s", self) - self.year = tags.Year.add(self.decade, str(meta.year()), self) - - self.playlists = [ tags.User.add("Collection", self), - tags.User.add("New Tracks", self) ] - - def __getitem__(self, item): - tag = self.__dict__.get(item, None) - if item == "length": - (m, s) = divmod(tag, 60) - return f"{m}:{s:02}" - elif item == "discnumber": - return f"{tag:02}" - elif item == "tracknumber": - return f"{self.discnumber}-{tag:02}" - elif item == "lastplayed": - return "Never" if tag == None else str(tag) - elif item == "genres": - return ", ".join([ str(g) for g in self.genres ]) - return None if tag == None else str(tag) - - def __getstate__(self): - state = self.__dict__.copy() - state["artist"] = str(self.artist) - state["album"] = str(self.album) - state["genres"] = [ str(g) for g in self.genres ] - state["decade"] = str(self.decade) - state["year"] = str(self.year) - state["playlists" ] = [ str(p) for p in self.playlists \ - if str(p) not in ("New Tracks", "Previous") ] - return state - - def __setstate__(self, state): - GObject.Object.__init__(self) - self.__dict__.update(state) - - def __set_tags__(self): - self.artist = tags.Artist.init_track(self.artist, self) - self.album = tags.Album.init_track(self.artist, self.album, self) - self.genres = [ tags.Genre.init_track(g, self) for g in self.genres ] - self.decade = tags.Decade.init_track(self.decade, self) - self.year = tags.Year.init_track(self.decade, self.year, self) - self.playlists = [ tags.User.init_track(p, self) for p in self.playlists ] - scanner.import_track(db.library.Table.find(self.library.name), - self.filepath(), self.playcount, self.lastplayed, - [ p.name for p in self.playlists ]) - - def about_to_remove(self): - tags.Artist.remove(self.artist, self) - tags.Album.remove(self.album, self) - for genre in self.genres: - tags.Genre.remove(genre, self) - tags.Decade.remove(self.decade, self) - tags.Year.remove(self.year, self) - for tag in self.playlists: - tag.remove_track(self) - - def add_to_playlist(self, name): - self.playlists.append(tags.User.add(name, self)) - - def filepath(self): - return self.library.name / self.path - - def played(self): - self.playcount += 1 - self.lastplayed = datetime.datetime.now() - - def remove_from_playlist(self, name): - tag = tags.User[name] - tag.remove_track(self) - self.playlists.remove(tag) diff --git a/tagdb/user.py b/tagdb/user.py deleted file mode 100644 index 411918b..0000000 --- a/tagdb/user.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright 2021 (c) Anna Schumaker. -from lib import tag -from lib import tagstore - -class CollectionTag(tag.Tag): - def __init__(self): - tag.Tag.__init__(self, "Collection") - self.loop = True - - def can_loop(self): return False - - -class NewTracksTag(tag.Tag): - def __init__(self): - tag.Tag.__init__(self, "New Tracks") - - def __getstate__(self): - state = super().__getstate__() - state["tracks"].clear() - return state - - -class PreviousTag(tag.Tag): - def __init__(self): - tag.Tag.__init__(self, "Previous") - - def __getstate__(self): - state = super().__getstate__() - state["tracks"].clear() - return state - - def can_random(self): return False - def can_loop(self): return False - - def add_track(self, track): - with self.lock: - self.tracks.insert(0, track) - self.current = 0 - self.TrackAdded.publish(self, track, 0) - - -class UpNextTag(tag.Tag): - def __init__(self): - tag.Tag.__init__(self, "Up Next") - - def can_loop(self): return False - - def next(self): - track = super().next() - if track is not None: - track.remove_from_playlist("Up Next") - with self.lock: - self.current -= 1 - if len(self.tracks) == 0: - self.random = False - return track - - -class UserTagStore(tagstore.TagStore): - def __init__(self): - tagstore.TagStore.__init__(self) - self.reset() - - def reset(self): - super().reset() - self.store["Collection"] = CollectionTag() - self.store["Favorites"] = tag.Tag("Favorites") - self.store["New Tracks"] = NewTracksTag() - self.store["Previous"] = PreviousTag() - self.store["Up Next"] = UpNextTag() diff --git a/ui/keyboard.py b/ui/keyboard.py index b5f8d05..0ceaeb6 100644 --- a/ui/keyboard.py +++ b/ui/keyboard.py @@ -1,7 +1,6 @@ # Copyright 2021 (c) Anna Schumaker. from gi.repository import Gtk, Gdk import audio -import tagdb Event = Gtk.EventControllerKey() Event.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)