Remove unused tagdb module

Implements #23 (Remove tagdb/ code)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2021-12-26 10:03:37 -05:00
parent 04dc67a097
commit 915e3c8340
17 changed files with 0 additions and 1295 deletions

View File

@ -1,5 +1,4 @@
# Copyright 2021 (c) Anna Schumaker.
import tagdb
from gi.repository import Gtk
from . import artwork
from . import controls

View File

@ -1,13 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
import tagdb
from gi.repository import GObject
class Selector(GObject.GObject):
def __init__(self): GObject.GObject.__init__(self)
def next(self): return None
def previous(self): return None
class TagdbSelector(Selector):
def next(self): return tagdb.Stack.next()[0]
def previous(self): return tagdb.Stack.previous()

View File

@ -1,11 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
import unittest
from gi.repository import GObject
from . import selector
class TestAudioSelector(unittest.TestCase):
def test_audio_selector_init(self):
select = selector.Selector()
self.assertIsInstance(select, GObject.GObject)
self.assertIsNone(select.next())
self.assertIsNone(select.previous())

View File

@ -1,9 +1,7 @@
#!/usr/bin/python
# Copyright 2021 (c) Anna Schumaker.
import lib
import tagdb
lib.settings.load()
tagdb.load()
import ui
ui.Application.run()

View File

@ -1,111 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
from . import allocator
from . import stack
from . import tags
from . import track
import db
import lib
import pathlib
import scanner
import threading
File = "tagdb.pickle"
Bus = lib.bus.Bus(500)
Tracks = allocator.TrackAllocator()
Stack = stack.TagStack()
class LibraryTag(lib.tag.Tag):
def __init__(self, path):
super().__init__(path)
self.clear = lib.thread.Thread(self.__do_clear__)
self.scan = lib.thread.Thread(self.__do_scan__)
def __setstate__(self, state):
super().__setstate__(state)
self.clear = lib.thread.Thread(self.__do_clear__)
self.scan = lib.thread.Thread(self.__do_scan__)
def __do_scan__(self):
for trak in Tracks.autoremove(self):
self.remove_track(trak)
track_set = set([ t.filepath() for t in Tracks.list_tracks(self) ])
for f in self.name.rglob("*"):
if f not in track_set and f.is_file():
if (track := Tracks.allocate(self, f)) != None:
self.add_track(track)
def __do_clear__(self):
for trak in self.tracks:
Tracks.remove(trak)
self.tracks.clear()
def fix_tracks(self):
for (index, trak) in enumerate(self.tracks):
t = Tracks[trak]
if t is not None:
self.tracks[index] = t
class LibraryStore(lib.tagstore.TagStore):
def __alloc_tag__(self, name, sort):
return LibraryTag(name)
def add(self, name):
return super().__add_tag__(name, None, None)
def remove(self, lib):
lib.clear()
super().remove(lib)
def fix_tracks(self):
for (id, tag) in self.store.items():
tag.fix_tracks()
Library = LibraryStore()
def _do_save():
try:
with lib.data.DataFile(File, lib.data.WRITE) as f:
f.pickle([ tags.get_state(), Tracks, Library, Stack ])
except Exception as e:
return lib.bus.RETRY
def save(*args):
Bus.board(_do_save)
def load():
global Library
global Tracks
global Stack
if not db.new_db():
return
with lib.data.DataFile(File, lib.data.READ) as f:
if f.exists():
(tagstate, Tracks, Library, Stack) = f.unpickle()
tags.set_state(*tagstate)
Tracks.load_tags()
Library.fix_tracks()
__register_callbacks()
scanner.Queue.push(scanner.task.CommitTask())
def __register_callbacks():
for store in [ Library, tags.User, Tracks ]:
store.Added.register(save)
store.Removed.register(save)
Tracks.Updated.register(save)
Stack.PushPop.register(save)
Stack.NextTrack.register(save)
__register_callbacks()
def reset():
Tracks.reset()
Library.reset()
Stack.reset()
tags.reset()
Bus.clear()
lib.data.DataFile(File, lib.data.READ).remove()
__register_callbacks()

View File

@ -1,85 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import publisher
from . import track
import threading
class TrackAllocator:
def __init__(self):
self.tracks = dict()
self.nextid = 0
self.lock = threading.Lock()
self.Added = publisher.Publisher()
self.Removed = publisher.Publisher()
self.Updated = publisher.Publisher()
def __alloc_track__(self, lib, filepath):
with self.lock:
trak = track.Track(self.nextid, filepath, lib)
self.tracks[self.nextid] = trak
self.nextid += 1
self.Added.publish(trak)
return trak
def __getitem__(self, id):
with self.lock:
return self.tracks.get(id, None)
def __getstate__(self):
with self.lock:
return { "tracks" : self.tracks,
"nextid" : self.nextid }
def __len__(self):
with self.lock:
return len(self.tracks)
def __setstate__(self, state):
self.__dict__.update(state)
self.lock = threading.Lock()
self.Added = publisher.Publisher()
self.Removed = publisher.Publisher()
self.Updated = publisher.Publisher()
def allocate(self, lib, filepath):
try:
return self.__alloc_track__(lib, filepath)
except Exception as e:
pass
def autoremove(self, lib):
with self.lock:
to_rm = [ trak for trak in self.tracks.values() \
if trak.library == lib and not trak.filepath().exists() ]
for trak in to_rm:
trak.about_to_remove()
del self.tracks[trak.trackid]
self.Removed.publish(trak)
return to_rm
def list_tracks(self, lib):
with self.lock:
for (id, track) in self.tracks.items():
if track.library ==lib:
yield track
def load_tags(self):
for (id, track) in self.tracks.items():
track.__set_tags__()
def played(self, track):
with self.lock:
track.played()
self.Updated.publish(track)
def remove(self, track):
with self.lock:
track.about_to_remove()
del self.tracks[track.trackid]
self.Removed.publish(track)
def reset(self):
with self.lock:
self.nextid = 0
self.tracks.clear()
self.Added.reset()
self.Removed.reset()

View File

@ -1,75 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import counter
from lib import publisher
from . import tags
class TagStack:
def __init__(self):
self.tags = [ ]
self.Counter = counter.Counter(-1, 99)
self.PushPop = publisher.Publisher()
self.NextTrack = publisher.Publisher()
def __do_next__(self, tag):
if track := tag.next():
track.add_to_playlist("Previous")
return track
def __getstate__(self):
return { "tags" : self.tags }
def __setstate__(self, state):
self.__dict__.update(state)
self.Counter = counter.Counter(-1, 99)
self.PushPop = publisher.Publisher()
self.NextTrack = publisher.Publisher()
def current(self):
if len(self.tags) == 0:
return tags.User.store["Collection"]
return self.tags[0]
def __next_track__(self):
if len(self.tags) == 0:
return self.__do_next__(tags.User["Collection"])
if track := self.__do_next__(self.tags[0]):
return track
self.pop()
return self.__next_track__()
def next(self):
ret = self.__next_track__()
count = self.Counter.decrement()
self.NextTrack.publish(ret)
return (ret, count != -1)
def pop(self):
prev = self.tags.pop(0)
self.PushPop.publish(prev, self.current())
def previous(self):
return tags.User["Previous"].next()
def push(self, tag):
prev = self.current()
if tag == tags.User["Previous"]:
return
if tag == tags.User["Collection"]:
self.tags.clear()
self.PushPop.publish(prev, tags.User["Collection"])
return
if tag in self.tags:
self.tags.remove(tag)
self.tags.insert(0, tag)
tag.stacked()
self.PushPop.publish(prev, tag)
def queue(self, track):
track.add_to_playlist("Up Next")
self.push(tags.User["Up Next"])
def reset(self):
self.tags.clear()
self.count = None
self.PushPop.reset()

View File

@ -1,36 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import user
from lib import tagstore
Artist = tagstore.TagStore()
Album = tagstore.TagSuperStore()
Genre = tagstore.TagStore()
Decade = tagstore.TagStore()
Year = tagstore.TagSuperStore()
User = user.UserTagStore()
def get_state():
return (Artist, Album, Genre, Decade, Year, User)
def set_state(artist, album, genre, decade, year, user):
global Artist
global Album
global Genre
global Decade
global Year
global User
Artist = artist
Album = album
Genre = genre
Decade = decade
Year = year
User = user
def reset():
Artist.reset()
Album.reset()
Genre.reset()
Decade.reset()
Year.reset()
User.reset()

View File

@ -1,110 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import publisher
from . import allocator
import pathlib
import threading
import unittest
test_tracks = pathlib.Path("./data/Test Album")
class FakeLibrary:
def __init__(self):
self.name = test_tracks
class TestTrackAllocator(unittest.TestCase):
def setUp(self):
self.lib = FakeLibrary()
self.added = None
self.removed = None
self.updated = None
def on_track_added(self, track):
self.added = track
def on_track_removed(self, track):
self.removed = track
def on_track_updated(self, track):
self.updated = track
def test_allocator_init(self):
alloc = allocator.TrackAllocator()
self.assertEqual(alloc.tracks, { })
self.assertEqual(alloc.nextid, 0)
self.assertIsInstance(alloc.lock, type(threading.Lock()))
self.assertIsInstance(alloc.Added, publisher.Publisher)
self.assertIsInstance(alloc.Removed, publisher.Publisher)
self.assertIsInstance(alloc.Updated, publisher.Publisher)
def test_allocator(self):
alloc = allocator.TrackAllocator()
alloc.Added.register(self.on_track_added)
alloc.Removed.register(self.on_track_removed)
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
self.assertEqual(alloc.tracks[0], track)
self.assertEqual(alloc[0], track)
self.assertEqual(len(alloc), 1)
self.assertEqual(alloc.nextid, 1)
self.assertEqual(track.trackid, 0)
self.assertEqual(self.added, track)
track2 = alloc.allocate(self.lib, test_tracks / "02 - Test {Disc 2}.ogg")
self.assertEqual(alloc.tracks[1], track2)
self.assertEqual(alloc[1], track2)
self.assertEqual(len(alloc), 2)
self.assertIsNone(alloc.allocate(self.lib, test_tracks / "No Such File"))
self.assertEqual(self.added, track2)
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track, track2 ])
track.library = None
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track2 ])
alloc.remove(track)
self.assertEqual(alloc[0], None)
self.assertEqual(alloc.tracks, { 1 : track2 })
self.assertEqual(alloc.nextid, 2)
self.assertEqual(self.removed, track)
alloc.reset()
self.assertEqual(alloc.nextid, 0)
self.assertEqual(alloc.tracks, { })
self.assertEqual(alloc.Added.subscribers, set())
self.assertEqual(alloc.Removed.subscribers, set())
def test_allocator_autoremove(self):
alloc = allocator.TrackAllocator()
alloc.Removed.register(self.on_track_removed)
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
track2 = alloc.allocate(self.lib, test_tracks / "02 - Test {Disc 2}.ogg")
track2.path = pathlib.Path("No Such File")
self.assertEqual(alloc.autoremove(self.lib), [ track2 ])
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track ])
def test_allocator_played(self):
alloc = allocator.TrackAllocator()
alloc.Updated.register(self.on_track_updated)
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
alloc.played(track)
self.assertEqual(track.playcount, 1)
self.assertEqual(self.updated, track)
def test_allocator_state(self):
alloc = allocator.TrackAllocator()
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
state = alloc.__getstate__()
self.assertEqual(state, { "tracks" : { 0 : track },
"nextid" : 1 })
alloc.__dict__.clear()
alloc.__setstate__(state)
self.assertEqual(alloc.tracks, { 0 : track })
self.assertEqual(alloc.nextid, 1)
self.assertIsInstance(alloc.lock, type(threading.Lock()))
self.assertIsInstance(alloc.Added, publisher.Publisher)
self.assertIsInstance(alloc.Removed, publisher.Publisher)
self.assertIsInstance(alloc.Updated, publisher.Publisher)

View File

@ -1,186 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import counter
from lib import publisher
from lib import tag
from . import stack
from . import tags
import unittest
class FakeTrack:
def __init__(self, n):
self.n = n
self.length = n
def add_to_playlist(self, name):
tags.User.add(name, self)
def remove_from_playlist(self, name):
tags.User[name].remove_track(self)
class TestTagStack(unittest.TestCase):
def setUp(self):
self.pushpop = None
self.next_track = None
def tearDown(self):
tags.reset()
def on_next_track(self, track):
self.next_track = track
def on_push_pop(self, prev, new):
self.pushpop = (prev, new)
def test_tag_stack_init(self):
s = stack.TagStack()
self.assertIsInstance(s.Counter, counter.Counter)
self.assertIsInstance(s.PushPop, publisher.Publisher)
self.assertIsInstance(s.NextTrack, publisher.Publisher)
self.assertEqual(s.tags, [ ])
def test_tag_stack_next(self):
s = stack.TagStack()
t = tag.Tag("Test")
s.NextTrack.register(self.on_next_track)
s.push(t)
t.tracks = [ FakeTrack(1), FakeTrack(2), FakeTrack(3) ]
tags.User["Collection"].tracks = [ FakeTrack(4), FakeTrack(5) ]
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (1, True) )
self.assertEqual(self.next_track.n, 1)
self.assertEqual(s.tags, [ t ])
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (2, True) )
self.assertEqual(self.next_track.n, 2)
self.assertEqual(s.tags, [ t ])
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (3, True) )
self.assertEqual(self.next_track.n, 3)
self.assertEqual(s.tags, [ t ])
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (4, True) )
self.assertEqual(self.next_track.n, 4)
self.assertEqual(s.tags, [ ])
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (5, True) )
self.assertEqual(self.next_track.n, 5)
self.assertEqual([ t.n for t in tags.User["Previous"].tracks ],
[ 5, 4, 3, 2, 1 ])
def test_tag_stack_autopause(self):
s = stack.TagStack()
t = tag.Tag("Test")
s.push(t)
t.tracks = [ FakeTrack(1), FakeTrack(2), FakeTrack(3), FakeTrack(4) ]
s.Counter.set_value(2)
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (1, True) )
self.assertEqual(s.tags, [ t ])
self.assertEqual(s.Counter.get_value(), 1)
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (2, True) )
self.assertEqual(s.tags, [ t ])
self.assertEqual(s.Counter.get_value(), 0)
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (3, False) )
self.assertEqual(s.tags, [ t ])
self.assertEqual(s.Counter.get_value(), -1)
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (4, True) )
self.assertEqual(s.tags, [ t ])
self.assertEqual(s.Counter.get_value(), -1)
def test_tag_stack_pop(self):
s = stack.TagStack()
t1 = tag.Tag("Test")
t2 = tag.Tag("Test Two")
s.tags = [ t1, t2 ]
s.PushPop.register(self.on_push_pop)
s.pop()
self.assertEqual(s.tags, [ t2 ])
self.assertEqual(self.pushpop, (t1, t2))
s.pop()
self.assertEqual(s.tags, [ ])
self.assertEqual(self.pushpop, (t2, tags.User["Collection"]))
def test_tag_stack_previous(self):
s = stack.TagStack()
for i in [ 1, 2, 3 ]:
tags.User["Previous"].add_track(i)
self.assertEqual(s.previous(), 2)
self.assertEqual(s.previous(), 1)
self.assertIsNone(s.previous())
def test_tag_stack_push(self):
s = stack.TagStack()
t1 = tag.Tag("Test")
t2 = tag.Tag("Test Two")
t1.current = 3
s.PushPop.register(self.on_push_pop)
s.push(t1)
self.assertEqual(s.tags, [ t1 ])
self.assertEqual(t1.current, -1)
self.assertEqual(self.pushpop, (tags.User["Collection"], t1))
s.push(t2)
self.assertEqual(s.tags, [ t2, t1 ])
self.assertEqual(self.pushpop, (t1, t2))
s.push(t1)
self.assertEqual(s.tags, [ t1, t2 ])
self.assertEqual(self.pushpop, (t2, t1))
s.push(tags.User["Previous"])
self.assertEqual(s.tags, [ t1, t2 ])
s.push(tags.User["Collection"])
self.assertEqual(s.tags, [ ])
self.assertEqual(self.pushpop, (t1, tags.User["Collection"]))
def test_tag_stack_queue(self):
s = stack.TagStack()
s.queue(FakeTrack(1))
self.assertEqual(s.tags, [ tags.User["Up Next"] ])
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (1, True) )
self.assertEqual(tags.User["Up Next"].tracks, [ ])
def test_tag_stack_state(self):
s = stack.TagStack()
t = tag.Tag("Test")
s.push(t)
state = s.__getstate__()
self.assertEqual(state, { "tags" : [ t ] })
s.__dict__.clear()
s.__setstate__(state)
self.assertEqual(s.tags, [ t ])
self.assertIsInstance(s.Counter, counter.Counter)
self.assertIsInstance(s.PushPop, publisher.Publisher)
self.assertIsInstance(s.NextTrack, publisher.Publisher)
s.PushPop.register(self.on_push_pop)
s.PushPop.register(self.on_next_track)
s.count = 3
s.reset()
self.assertEqual(s.tags, [ ])
self.assertIsInstance(s.Counter, counter.Counter)
self.assertEqual(len(s.PushPop.subscribers), 0)
self.assertEqual(len(s.NextTrack.subscribers), 0)

View File

@ -1,174 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
import db
import lib
import pathlib
import tagdb
import unittest
test_tracks = pathlib.Path("./data/Test Album")
class TestLibraryTag(unittest.TestCase):
def tearDown(self):
db.NewDatabase = True
tagdb.reset()
def test_library_tag_init(self):
library = tagdb.LibraryTag(test_tracks)
self.assertIsInstance(library, lib.tag.Tag)
self.assertIsInstance(library.clear, lib.thread.Thread)
self.assertIsInstance(library.scan, lib.thread.Thread)
def test_library_tag_state(self):
lib = tagdb.LibraryTag(test_tracks)
lib.scan().join()
state = lib.__getstate__()
self.assertEqual(set(state.keys()),
set([ "name", "sort", "current", "loop", "random", "tracks" ]))
self.assertEqual(state["name"], test_tracks)
self.assertEqual(state["tracks"], [ i for i in range(12) ])
lib.__dict__.clear()
lib.__setstate__(state)
lib.fix_tracks()
self.assertEqual(lib.name, test_tracks)
self.assertEqual(lib.tracks, [ tagdb.Tracks[i] for i in range(12) ])
self.assertEqual(lib.clear.func, lib.__do_clear__)
self.assertEqual(lib.scan.func, lib.__do_scan__)
def test_library_tag_scan(self):
lib = tagdb.LibraryTag(test_tracks)
lib.scan().join()
for i in range(12):
self.assertEqual(lib.tracks[i], tagdb.Tracks[i])
def test_library_tag_scan_new(self):
lib = tagdb.LibraryTag(test_tracks)
lib.scan().join()
for trak in [ tagdb.Tracks[0], tagdb.Tracks[11] ]:
lib.remove_track(trak)
tagdb.Tracks.remove(trak)
lib.scan().join()
self.assertEqual(len(lib.tracks), 12)
def test_library_tag_scan_remove(self):
lib = tagdb.LibraryTag(test_tracks)
lib.scan().join()
trak = tagdb.track.Track(tagdb.Tracks.nextid,
test_tracks / "01 - Test Track.ogg", lib)
trak.path = pathlib.Path("No Such File")
lib.tracks.append(trak)
tagdb.Tracks.tracks[trak.trackid] = trak
lib.scan().join()
self.assertNotIn(trak, lib.tracks)
def test_library_tag_clear(self):
lib = tagdb.LibraryTag(test_tracks)
lib.scan().join()
self.assertEqual(len(lib), 12)
lib.clear().join()
self.assertEqual(len(lib), 0)
self.assertEqual(len(tagdb.Tracks), 0)
class TestLibraryStore(unittest.TestCase):
def test_library_store(self):
store = tagdb.LibraryStore()
lib = store.add(test_tracks)
self.assertIsInstance(lib, tagdb.LibraryTag)
lib.scan().join()
self.assertEqual(len(lib), 12)
store.remove(lib)
lib.clear.join()
self.assertEqual(len(lib), 0)
class TestTrackDB(unittest.TestCase):
def tearDown(self):
tagdb.reset()
def test_tagdb_init(self):
self.assertIsInstance(tagdb.Bus, lib.bus.Bus)
self.assertIsInstance(tagdb.Tracks, tagdb.allocator.TrackAllocator)
self.assertIsInstance(tagdb.Stack, tagdb.stack.TagStack)
self.assertIsInstance(tagdb.Library, tagdb.LibraryStore)
self.assertEqual(tagdb.File, "tagdb.pickle")
self.assertEqual(tagdb.Bus.timeout, 500)
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
self.assertIn(tagdb.save, tagdb.Library.Removed.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Removed.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Updated.subscribers)
self.assertIn(tagdb.save, tagdb.tags.User.Added.subscribers)
self.assertIn(tagdb.save, tagdb.tags.User.Removed.subscribers)
self.assertIn(tagdb.save, tagdb.Stack.PushPop.subscribers)
self.assertIn(tagdb.save, tagdb.Stack.NextTrack.subscribers)
def test_tagdb_save_load(self):
db_file = lib.data.DataFile(tagdb.File, lib.data.READ)
library = tagdb.Library.add(test_tracks)
library.scan()
tagdb.Stack.push(library)
tagdb.save()
self.assertEqual(tagdb.Bus.passengers, [ (tagdb._do_save, ()) ])
self.assertFalse(db_file.exists())
library.scan.join()
tagdb.Bus.complete()
self.assertTrue(db_file.exists())
db.reset()
tagdb.tags.reset()
tagdb.Library.reset()
tagdb.Tracks.reset()
tagdb.Stack.reset()
tagdb.load()
self.assertEqual(len(tagdb.Tracks), 12)
self.assertEqual(len(tagdb.Library), 1)
self.assertEqual(len(tagdb.Library[test_tracks]), 12)
self.assertEqual(len(tagdb.tags.Artist), 3)
self.assertEqual(len(tagdb.tags.Album), 12)
self.assertEqual(len(tagdb.tags.Genre), 4)
self.assertEqual(len(tagdb.tags.Decade), 2)
self.assertEqual(len(tagdb.tags.Year), 2)
self.assertEqual(tagdb.Stack.tags, [ tagdb.Library[test_tracks] ])
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
self.assertIn(tagdb.save, tagdb.tags.User.Added.subscribers)
def test_tagdb_stress(self):
lib = tagdb.Library.add(pathlib.Path("./trier/Test Library/"))
lib.scan().join()
tagdb.Bus.complete()
tagdb.Library.remove(lib)
lib.clear.join()
tagdb.Bus.complete()
def test_tagdb_reset(self):
tagdb.Tracks.Added.register(1)
tagdb.Tracks.Removed.register(1)
tagdb.Tracks.Updated.register(1)
tagdb.Library.Added.register(1)
tagdb.Library.Removed.register(1)
tagdb.Library.store = { "a" : 1, "b" : 2, "c" : 3 }
with lib.data.DataFile(tagdb.File, lib.data.WRITE) as f:
f.pickle([ 0, [] ])
tagdb.reset()
self.assertEqual(len(tagdb.Library), 0)
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Removed.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Updated.subscribers)
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
self.assertIn(tagdb.save, tagdb.Library.Removed.subscribers)
self.assertFalse(lib.data.DataFile(tagdb.File, lib.data.READ).exists())

View File

@ -1,50 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import tagstore
from . import tags
from . import user
import unittest
class TestTags(unittest.TestCase):
def tearDown(self):
tags.reset()
def test_tags_init(self):
self.assertIsInstance(tags.Artist, tagstore.TagStore)
self.assertIsInstance(tags.Album, tagstore.TagSuperStore)
self.assertIsInstance(tags.Genre, tagstore.TagStore)
self.assertIsInstance(tags.Decade, tagstore.TagStore)
self.assertIsInstance(tags.Year, tagstore.TagSuperStore)
self.assertIsInstance(tags.User, user.UserTagStore)
def test_tags_reset(self):
tags.Artist.store = {"a" : 1 }
tags.Album.store = {("a", "b") : 2 }
tags.Genre.store = {"c" : 3 }
tags.Decade.store = {"d" : 4 }
tags.User.store = {"e" : 5 }
tags.reset()
self.assertEqual(tags.Artist.store, { })
self.assertEqual(tags.Album.store, { })
self.assertEqual(tags.Genre.store, { })
self.assertEqual(tags.Decade.store, { })
self.assertIsNotNone(tags.User["Collection"])
self.assertIsNotNone(tags.User["Up Next"])
self.assertIsNotNone(tags.User["Previous"])
self.assertIsNotNone(tags.User["Favorites"])
self.assertIsNotNone(tags.User["Up Next"])
def test_tags_state(self):
state = tags.get_state()
self.assertEqual(state, ( tags.Artist, tags.Album, tags.Genre,
tags.Decade, tags.Year, tags.User ))
tags.set_state(*(1, 2, 3, 4, 5, 6))
self.assertEqual(tags.Artist, 1)
self.assertEqual(tags.Album, 2)
self.assertEqual(tags.Genre, 3)
self.assertEqual(tags.Decade, 4)
self.assertEqual(tags.Year, 5)
self.assertEqual(tags.User, 6)
tags.set_state(*state)

View File

@ -1,173 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
from lib import publisher
from gi.repository import GObject
from . import tags
from . import track
import datetime
import pathlib
import unittest
test_tracks = pathlib.Path("./data/Test Album")
class FakeLibrary:
def __init__(self):
self.name = test_tracks
class TestTrack(unittest.TestCase):
def setUp(self):
tags.reset()
self.lib = FakeLibrary()
def tearDown(self):
tags.reset()
def test_track_init(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.trackid, 1)
self.assertEqual(trak.filepath(), test_tracks / "01 - Test Track.ogg")
self.assertIsInstance(trak, GObject.Object)
def test_track_album(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
artist = trak.artist
self.assertEqual(trak.album, tags.Album[artist, "Test Album"])
self.assertEqual(trak["album"], "Test Album")
def test_track_artist(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.artist, tags.Artist["Test Artist"])
self.assertEqual(trak["artist"], "Test Artist")
self.assertEqual(trak.artist.sort, "artist, test")
trak2 = track.Track(2, test_tracks / "02 - Test {Disc 2}.ogg", self.lib)
self.assertEqual(trak2.artist, tags.Artist["Test Album Artist"])
self.assertEqual(trak2.artist.sort, "album artist, test")
def test_track_decade(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.decade, tags.Decade["2010s"])
self.assertEqual(trak["decade"], "2010s")
def test_track_discnumber(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.discnumber, 1)
self.assertEqual(trak["discnumber"], "01")
def test_track_filepath(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.path, pathlib.Path("01 - Test Track.ogg"))
self.assertEqual(trak.filepath(), test_tracks / "01 - Test Track.ogg")
def test_track_genres(self):
trak = track.Track(1, test_tracks / "02 - Test {Disc 2}.ogg", self.lib)
genlist = [ tags.Genre["Test"], tags.Genre["Genre"], tags.Genre["List"] ]
self.assertEqual(trak.genres, genlist)
self.assertEqual(trak["genres"], "Test, Genre, List")
def test_track_length(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.length, 10)
self.assertEqual(trak["length"], "0:10")
trak.length = 61
self.assertEqual(trak["length"], "1:01")
trak.length = 3
self.assertEqual(trak["length"], "0:03")
def test_track_played(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
trak.playcount = 0
trak.lastplayed = None
self.assertEqual(trak["playcount"], "0")
self.assertEqual(trak["lastplayed"], "Never")
trak.played()
self.assertEqual(trak.playcount, 1)
self.assertEqual(trak.lastplayed.date(), datetime.date.today())
self.assertEqual(trak["playcount"], "1")
#self.assertEqual(trak["lastplayed"], str(datetime.date.today()))
trak.played()
self.assertEqual(trak.playcount, 2)
self.assertEqual(trak["playcount"], "2")
def test_track_title(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.title, "Test Track")
self.assertEqual(trak["title"], "Test Track")
def test_track_tracknumber(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.tracknumber, 1)
self.assertEqual(trak["tracknumber"], "1-01")
trak.tracknumber = 10
self.assertEqual(trak["tracknumber"], "1-10")
def test_track_year(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.year, tags.Year[trak.decade, "2019"])
self.assertEqual(trak["year"], "2019")
trak2 = track.Track(2, test_tracks / "02 - Test {Disc 2}.ogg", self.lib)
self.assertEqual(trak2.year, tags.Year[trak2.decade, "2019"])
self.assertEqual(trak2["year"], "2019")
def test_track_playlists(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
default = [ tags.User["Collection"], tags.User["New Tracks"] ]
self.assertEqual(trak.playlists, default)
trak.add_to_playlist("Test")
self.assertEqual(trak.playlists, default + [ tags.User["Test"] ])
trak.remove_from_playlist("Test")
self.assertEqual(trak.playlists, default)
self.assertEqual(len(tags.User["Test"]), 0)
def test_track_about_to_remove(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
trak.about_to_remove()
self.assertEqual(len(tags.Artist), 0)
self.assertEqual(len(tags.Album), 0)
self.assertEqual(len(tags.Genre), 0)
self.assertEqual(len(tags.Decade), 0)
self.assertEqual(len(tags.Year), 0)
self.assertEqual(len(tags.User["Collection"]), 0)
self.assertEqual(len(tags.User["New Tracks"]), 0)
def test_track_state(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
trak.add_to_playlist("Starred")
trak.add_to_playlist("Previous")
state = trak.__getstate__()
self.assertEqual(state["artist"], "Test Artist")
self.assertEqual(state["album"], "Test Album")
self.assertEqual(state["genres"], [ "Test" ])
self.assertEqual(state["decade"], "2010s")
self.assertEqual(state["year"], "2019")
self.assertEqual(state["playlists"], [ "Collection", "Starred" ])
tags.Artist["Test Artist"].tracks = [ 1 ]
tags.Album[trak.artist, "Test Album"].tracks = [ 1 ]
tags.Genre["Test"].tracks = [ 1 ]
tags.Decade["2010s"].tracks = [ 1 ]
tags.Year[trak.decade, "2019"].tracks = [ 1 ]
tags.User["Collection"].tracks = [ 1 ]
tags.User["Starred"].tracks = [ 1 ]
trak.__dict__.clear()
trak.__setstate__(state)
trak.__set_tags__()
self.assertEqual(trak.artist, tags.Artist["Test Artist"])
self.assertEqual(trak.album, tags.Album[trak.artist, "Test Album"])
self.assertEqual(trak.genres, [ tags.Genre["Test"] ])
self.assertEqual(trak.decade, tags.Decade["2010s"])
self.assertEqual(trak.year, tags.Year[trak.decade, "2019"])
self.assertEqual(trak.playlists, [ tags.User["Collection"],
tags.User["Starred"] ])
self.assertEqual(tags.Artist["Test Artist"].tracks, [ trak ])
self.assertEqual(tags.Album[trak.artist, "Test Album"].tracks, [ trak ])
self.assertEqual(tags.Genre["Test"].tracks, [ trak ])
self.assertEqual(tags.Decade["2010s"].tracks, [ trak ])
self.assertEqual(tags.Year[trak.decade, "2019"].tracks, [ trak ])
self.assertEqual(tags.User["Collection"].tracks, [ trak ])
self.assertEqual(tags.User["Starred"].tracks, [ trak ])

View File

@ -1,99 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import fake
from lib import tag
from lib import tagstore
from . import user
import unittest
class TestUserTags(unittest.TestCase):
def test_collection_tag(self):
c = user.CollectionTag()
self.assertIsInstance(c, tag.Tag)
self.assertTrue(c.loop)
self.assertFalse(c.can_loop())
def test_new_tracks_tag(self):
n = user.NewTracksTag()
self.assertIsInstance(n, tag.Tag)
self.assertTrue(n.can_loop())
self.assertTrue(n.can_random())
n.tracks = [ fake.Track(1), fake.Track(2), fake.Track(3) ]
state = n.__getstate__()
self.assertEqual(state["name"], "New Tracks")
self.assertEqual(state["sort"], "new tracks")
self.assertEqual(state["current"], -1)
self.assertEqual(state["tracks"], [ ])
self.assertFalse(state["loop"])
self.assertFalse(state["random"])
def test_previous_tag(self):
p = user.PreviousTag()
self.assertIsInstance(p, tag.Tag)
self.assertFalse(p.can_loop())
self.assertFalse(p.can_random())
p.add_track(fake.Track(1))
self.assertEqual(p.tracks, [ fake.Track(1) ])
self.assertIsNone(p.next())
p.add_track(fake.Track(2))
self.assertEqual(p.tracks, [ fake.Track(2), fake.Track(1) ])
self.assertEqual(p.next(), fake.Track(1))
p.add_track(fake.Track(3))
self.assertEqual(p.tracks, [ fake.Track(3), fake.Track(2), fake.Track(1) ])
self.assertEqual(p.next(), fake.Track(2))
self.assertEqual(p.next(), fake.Track(1))
state = p.__getstate__()
self.assertEqual(state["name"], "Previous")
self.assertEqual(state["sort"], "previous")
self.assertEqual(state["current"], 2)
self.assertEqual(state["tracks"], [ ])
self.assertFalse(state["loop"])
self.assertFalse(state["random"])
def test_up_next_tag(self):
u = user.UpNextTag()
self.assertIsInstance(u, tag.Tag)
self.assertFalse(u.loop)
self.assertFalse(u.can_loop())
self.assertIsNone(u.next())
u.tracks = [ fake.Track(1, u), fake.Track(2, u), fake.Track(3, u) ]
self.assertEqual(u.next(), fake.Track(1, u))
self.assertEqual(u.tracks, [ fake.Track(2, u), fake.Track(3, u) ])
self.assertEqual(u.next(), fake.Track(2, u))
self.assertEqual(u.tracks, [ fake.Track(3, u) ])
u.random = True
self.assertEqual(u.next(), fake.Track(3, u))
self.assertEqual(u.tracks, [ ])
self.assertFalse(u.random)
class TestUserTagStore(unittest.TestCase):
def test_user_init(self):
store = user.UserTagStore()
self.assertIsInstance(store, tagstore.TagStore)
self.assertIsInstance(store["Collection"], user.CollectionTag)
self.assertIsInstance(store["New Tracks"], user.NewTracksTag)
self.assertIsInstance(store["Previous"], user.PreviousTag)
self.assertIsInstance(store["Favorites"], tag.Tag)
self.assertIsInstance(store["Up Next"], user.UpNextTag)
def test_user_reset(self):
store = user.UserTagStore()
store.add("Playlist")
self.assertEqual(len(store), 6)
store.reset()
self.assertEqual(len(store), 5)
self.assertIsInstance(store["Collection"], user.CollectionTag)
self.assertIsInstance(store["New Tracks"], user.NewTracksTag)
self.assertIsInstance(store["Previous"], user.PreviousTag)
self.assertIsInstance(store["Favorites"], tag.Tag)
self.assertIsInstance(store["Up Next"], user.UpNextTag)

View File

@ -1,98 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
from lib import metadata
from lib import publisher
from . import tags
from gi.repository import GObject
import datetime
import db
import scanner
class Track(GObject.Object):
def __init__(self, trackid, filepath, library):
GObject.Object.__init__(self)
self.trackid = trackid
self.path = filepath.relative_to(library.name)
self.library = library
self.lastplayed = None
self.playcount = 0
with metadata.Metadata(filepath) as meta:
self.title = meta.title()
self.length = meta.length()
self.discnumber = meta.discnumber()
self.tracknumber = meta.tracknumber()
self.artist = tags.Artist.add(meta.artist(), self, sort=meta.artistsort())
self.album = tags.Album.add(self.artist, meta.album(), self)
self.genres = [ tags.Genre.add(g, self) for g in meta.genres() ]
self.decade = tags.Decade.add(f"{meta.decade()}s", self)
self.year = tags.Year.add(self.decade, str(meta.year()), self)
self.playlists = [ tags.User.add("Collection", self),
tags.User.add("New Tracks", self) ]
def __getitem__(self, item):
tag = self.__dict__.get(item, None)
if item == "length":
(m, s) = divmod(tag, 60)
return f"{m}:{s:02}"
elif item == "discnumber":
return f"{tag:02}"
elif item == "tracknumber":
return f"{self.discnumber}-{tag:02}"
elif item == "lastplayed":
return "Never" if tag == None else str(tag)
elif item == "genres":
return ", ".join([ str(g) for g in self.genres ])
return None if tag == None else str(tag)
def __getstate__(self):
state = self.__dict__.copy()
state["artist"] = str(self.artist)
state["album"] = str(self.album)
state["genres"] = [ str(g) for g in self.genres ]
state["decade"] = str(self.decade)
state["year"] = str(self.year)
state["playlists" ] = [ str(p) for p in self.playlists \
if str(p) not in ("New Tracks", "Previous") ]
return state
def __setstate__(self, state):
GObject.Object.__init__(self)
self.__dict__.update(state)
def __set_tags__(self):
self.artist = tags.Artist.init_track(self.artist, self)
self.album = tags.Album.init_track(self.artist, self.album, self)
self.genres = [ tags.Genre.init_track(g, self) for g in self.genres ]
self.decade = tags.Decade.init_track(self.decade, self)
self.year = tags.Year.init_track(self.decade, self.year, self)
self.playlists = [ tags.User.init_track(p, self) for p in self.playlists ]
scanner.import_track(db.library.Table.find(self.library.name),
self.filepath(), self.playcount, self.lastplayed,
[ p.name for p in self.playlists ])
def about_to_remove(self):
tags.Artist.remove(self.artist, self)
tags.Album.remove(self.album, self)
for genre in self.genres:
tags.Genre.remove(genre, self)
tags.Decade.remove(self.decade, self)
tags.Year.remove(self.year, self)
for tag in self.playlists:
tag.remove_track(self)
def add_to_playlist(self, name):
self.playlists.append(tags.User.add(name, self))
def filepath(self):
return self.library.name / self.path
def played(self):
self.playcount += 1
self.lastplayed = datetime.datetime.now()
def remove_from_playlist(self, name):
tag = tags.User[name]
tag.remove_track(self)
self.playlists.remove(tag)

View File

@ -1,70 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import tag
from lib import tagstore
class CollectionTag(tag.Tag):
def __init__(self):
tag.Tag.__init__(self, "Collection")
self.loop = True
def can_loop(self): return False
class NewTracksTag(tag.Tag):
def __init__(self):
tag.Tag.__init__(self, "New Tracks")
def __getstate__(self):
state = super().__getstate__()
state["tracks"].clear()
return state
class PreviousTag(tag.Tag):
def __init__(self):
tag.Tag.__init__(self, "Previous")
def __getstate__(self):
state = super().__getstate__()
state["tracks"].clear()
return state
def can_random(self): return False
def can_loop(self): return False
def add_track(self, track):
with self.lock:
self.tracks.insert(0, track)
self.current = 0
self.TrackAdded.publish(self, track, 0)
class UpNextTag(tag.Tag):
def __init__(self):
tag.Tag.__init__(self, "Up Next")
def can_loop(self): return False
def next(self):
track = super().next()
if track is not None:
track.remove_from_playlist("Up Next")
with self.lock:
self.current -= 1
if len(self.tracks) == 0:
self.random = False
return track
class UserTagStore(tagstore.TagStore):
def __init__(self):
tagstore.TagStore.__init__(self)
self.reset()
def reset(self):
super().reset()
self.store["Collection"] = CollectionTag()
self.store["Favorites"] = tag.Tag("Favorites")
self.store["New Tracks"] = NewTracksTag()
self.store["Previous"] = PreviousTag()
self.store["Up Next"] = UpNextTag()

View File

@ -1,7 +1,6 @@
# Copyright 2021 (c) Anna Schumaker.
from gi.repository import Gtk, Gdk
import audio
import tagdb
Event = Gtk.EventControllerKey()
Event.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)