lib: Remove unused files

These are no longer needed now that tagdb has been removed

Implements #24 (Clean up lib/)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2021-12-26 10:18:09 -05:00
parent 915e3c8340
commit 2daefa932c
16 changed files with 1 additions and 1094 deletions

View File

@ -3,11 +3,7 @@ import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Gst", "1.0")
from . import bus
from . import data
from . import filter
from . import publisher
from . import settings
from . import tag
from . import tagstore
from . import thread
from . import version

View File

@ -1,59 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
import threading
from gi.repository import GLib
Start = publisher.Publisher()
RETRY = GLib.SOURCE_CONTINUE
class Bus:
def __init__(self, milliseconds):
self.timeout = milliseconds
self.timeout_id = None
self.passengers = [ ]
self.lock = threading.Lock()
def __do_board(self, func, *args):
with self.lock:
if (func, args) not in self.passengers:
self.passengers.append( (func, args) )
if self.timeout_id == None:
self.timeout_id = GLib.timeout_add(self.timeout, self.run)
return True
return False
def board(self, func, *args):
if self.__do_board(func, *args):
Start.publish(self)
def clear(self):
with self.lock:
if self.timeout_id:
GLib.source_remove(self.timeout_id)
self.timeout_id = None
self.passengers.clear()
def complete(self):
with self.lock:
for (func, args) in self.passengers:
func(*args)
GLib.source_remove(self.timeout_id)
self.timeout_id = None
self.passengers.clear()
def running(self):
with self.lock:
return self.timeout_id != None
def run(self):
with self.lock:
(func, args) = self.passengers[0]
if func(*args) == RETRY:
return GLib.SOURCE_CONTINUE
self.passengers.pop(0)
if len(self.passengers) == 0:
self.timeout_id = None
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE

View File

@ -1,22 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from gi.repository import Gtk
class Counter(Gtk.Adjustment):
def __init__(self, min, max):
Gtk.Adjustment.__init__(self)
self.configure(value=min, lower=min, upper=max + 1, step_increment=1,
page_increment=1, page_size=1)
def __change_value__(self, n):
value = self.get_value()
self.set_value(value + n)
if self.get_value() == value:
return None
return self.get_value()
def increment(self):
return self.__change_value__(1)
def decrement(self):
return self.__change_value__(-1)

View File

@ -1,24 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from gi.repository import GObject
tracks = { }
class FakeTrack(GObject.GObject):
def __init__(self, n, tag=None):
GObject.GObject.__init__(self)
self.trackid = n
self.length = n
self.tag = tag
def __int__(self):
return self.trackid
def add_to_playlist(self, name):
self.tag.add_track(self)
def remove_from_playlist(self, name):
self.tag.remove_track(self)
def Track(n, tag=None):
return tracks.setdefault((n,tag), FakeTrack(n, tag=tag))

View File

@ -1,51 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
import mutagen
import re
class Metadata:
def __init__(self, filepath):
self.path = filepath
self.file = None
def __enter__(self):
self.file = mutagen.File(self.path)
return self
def __exit__(self, exp_type, exp_value, traceback):
self.file = None
return exp_type == None
def album(self):
return self.file.get("album", [ "Unknown Album" ])[0]
def artist(self):
artist = self.file.get("artist", [ "Unknown Artist" ])
return self.file.get("albumartist", artist)[0]
def artistsort(self):
sort = self.file.get("artistsort", [ None ])
return self.file.get("albumartistsort", sort)[0]
def decade(self):
return (self.year() // 10) * 10
def discnumber(self):
return int(self.file.get("discnumber", [ 1 ])[0])
def genres(self):
genre = self.file.get("genre", [ "" ])[0]
return [ g.strip() for g in re.split(",|;|/|:", genre) ]
def length(self):
return int(self.file.info.length)
def title(self):
return self.file.get("title", [ "" ])[0]
def tracknumber(self):
return int(self.file.get("tracknumber", [ 0 ])[0])
def year(self):
year = self.file.get("date", [ "0" ])
year = self.file.get("originalyear", year)[0]
return int(re.match("\d+", year).group(0))

View File

@ -1,19 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
class Publisher:
def __init__(self):
self.subscribers = set()
def publish(self, *args):
funcs = self.subscribers.copy()
for func in funcs:
func(*args)
def register(self, func):
self.subscribers.add(func)
def reset(self):
self.subscribers.clear()
def unregister(self, func):
self.subscribers.discard(func)

View File

@ -1,146 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
import random
import threading
class Tag:
def __init__(self, name, sort=None):
self.name = name
self.sort = sort.lower() if sort else str(name).lower()
self.current = -1
self.runtime = 0
self.loop = False
self.random = False
self.tracks = [ ]
self.widgets = None
self.lock = threading.Lock()
self.TrackAdded = publisher.Publisher()
self.TrackRemoved = publisher.Publisher()
def __getitem__(self, n):
with self.lock:
if n < len(self.tracks):
return self.tracks[n]
return None
def __getstate__(self):
with self.lock:
return { "name" : self.name,
"sort" : self.sort,
"current" : self.current,
"loop" : self.loop,
"random" : self.random,
"tracks" : [ t.trackid for t in self.tracks ] }
def __len__(self):
with self.lock:
return len(self.tracks)
def __lt__(self, rhs):
if not isinstance(rhs, SuperTag):
return self.sort < rhs.sort
if self == rhs.parent:
return True
return self.sort < rhs.parent.sort
def __next_track__(self):
if self.loop == True and self.current >= len(self.tracks):
return 0
return self.current + 1
def __random_track__(self):
i = 1
length = len(self.tracks)
if len(self.tracks) >= 3:
i = random.randint(1, length - 1)
return (self.current + i) % (1 if length == 0 else length)
def __setstate__(self, state):
self.name = state["name"]
self.sort = state["sort"]
self.current = state["current"]
self.loop = state["loop"]
self.random = state["random"]
self.tracks = state["tracks"]
self.runtime = 0
self.widgets = None
self.lock = threading.Lock()
self.TrackAdded = publisher.Publisher()
self.TrackRemoved = publisher.Publisher()
def __str__(self):
return self.name
def add_track(self, track):
with self.lock:
if track in self.tracks:
return
pos = len(self.tracks)
self.tracks.append(track)
self.runtime += track.length
self.TrackAdded.publish(self, track, pos)
def can_loop(self): return True
def can_random(self): return True
def get_header(self):
return self.sort[0].upper() if len(self.sort) > 0 else ""
def init_track(self, track):
with self.lock:
try:
i = self.tracks.index(track.trackid)
self.tracks[i] = track
self.runtime += track.length
except Exception as e:
pass
def next(self):
with self.lock:
if self.random == True:
self.current = self.__random_track__()
else:
self.current = self.__next_track__()
if self.current < len(self.tracks):
return self.tracks[self.current]
return None
def remove_track(self, track):
with self.lock:
pos = self.tracks.index(track)
self.tracks.remove(track)
self.runtime -= track.length
self.TrackRemoved.publish(self, track, pos)
def stacked(self):
with self.lock:
if self.current >= len(self.tracks):
self.current = -1
def track_selected(self, track):
with self.lock:
self.current = self.tracks.index(track)
track.add_to_playlist("Previous")
class SuperTag(Tag):
def __init__(self, parent, name, sort=None):
Tag.__init__(self, name, sort)
self.parent = parent
def __getstate__(self):
state = Tag.__getstate__(self)
state["parent"] = self.parent
return state
def __lt__(self, rhs):
if not isinstance(rhs, SuperTag):
return self.parent.sort < rhs.sort
if self.parent != rhs.parent:
return self.parent < rhs.parent
return self.sort < rhs.sort
def __setstate__(self, state):
Tag.__setstate__(self, state)
self.parent = state["parent"]

View File

@ -1,98 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
from . import tag
import threading
class TagStore:
def __init__(self):
self.store = dict()
self.lock = threading.Lock()
self.Added = publisher.Publisher()
self.Removed = publisher.Publisher()
def __add_tag__(self, name, sort, track):
t = self.__get_tag__(name, sort)
if track:
t.add_track(track)
return t
def __alloc_tag__(self, name, sort):
return tag.Tag(name, sort)
def __getitem__(self, name):
with self.lock:
return self.store.get(name)
def __getstate__(self):
with self.lock:
return { "store" : self.store }
def __get_tag__(self, name, sort):
with self.lock:
if (t := self.store.get(name)) != None:
return t
t = self.__alloc_tag__(name, sort)
self.store[name] = t
self.Added.publish(t)
return t
def __len__(self):
with self.lock:
return len(self.store)
def __pop_tag__(self, t):
self.store.pop(t.name)
def __setstate__(self, state):
self.store = state["store"]
self.lock = threading.Lock()
self.Added = publisher.Publisher()
self.Removed = publisher.Publisher()
def add(self, name, track=None, sort=None):
return self.__add_tag__(name.strip(), sort, track)
def init_track(self, name, track):
with self.lock:
if (t := self.store.get(name)) != None:
t.init_track(track)
return t
def remove(self, t, track=None):
if track:
t.remove_track(track)
if (track == None or len(t) == 0) and t in self.store.values():
with self.lock:
self.__pop_tag__(t)
self.Removed.publish(t)
def reset(self):
with self.lock:
self.store.clear()
self.Added.reset()
self.Removed.reset()
def tags(self):
with self.lock:
for (name, tag) in self.store.items():
yield tag
class TagSuperStore(TagStore):
def __alloc_tag__(self, key, sort):
return tag.SuperTag(key[0], key[1], sort)
def __pop_tag__(self, t):
self.store.pop((t.parent, t.name))
def add(self, parent, name, track, sort=None):
return super().__add_tag__((parent, name.strip()), sort, track)
def init_track(self, parent, name, track):
return super().init_track((parent, name), track)
def tags(self, parent=None):
with self.lock:
for (name, tag) in self.store.items():
if parent == None or tag.parent == parent:
yield tag

View File

@ -1,93 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import bus
from . import publisher
from gi.repository import GLib
import threading
import time
import unittest
main_context = GLib.main_context_default()
class TestBus(unittest.TestCase):
def setUp(self):
self.count = 0
self.start = 0
self.retry = False
bus.Start.register(self.cb_start)
def tearDown(self):
bus.Start.unregister(self.cb_start)
def cb_start(self, bus):
self.start += 1
def cb_one(self):
self.count += 1
def cb_two(self, arg):
self.count = arg
def cb_retry(self, arg):
self.count = arg
return bus.RETRY if self.retry == True else None
def test_bus_init(self):
self.assertEqual(bus.RETRY, GLib.SOURCE_CONTINUE)
b = bus.Bus(100)
self.assertEqual(b.timeout, 100)
self.assertEqual(b.passengers, [ ])
self.assertIsNone(b.timeout_id)
self.assertIsInstance(b.lock, type(threading.Lock()))
self.assertIsInstance(bus.Start, publisher.Publisher)
def test_bus_board(self):
b = bus.Bus(100)
b.board(self.cb_one)
b.board(self.cb_one)
self.assertEqual(b.passengers, [ (self.cb_one,()) ])
self.assertIsNotNone(b.timeout_id)
self.assertTrue(b.running())
self.assertEqual(self.start, 1)
time.sleep(0.1)
while main_context.iteration(may_block=False): pass
self.assertEqual(self.count, 1)
self.assertIsNone(b.timeout_id)
self.assertFalse(b.running())
def test_bus_clear(self):
b = bus.Bus(100)
b.clear()
for i in range(100):
b.board(self.cb_one)
b.clear()
self.assertEqual(b.passengers, [ ])
self.assertIsNone(b.timeout_id)
def test_bus_complete(self):
b = bus.Bus(100)
for i in range(100):
b.board(self.cb_two, i)
b.complete()
self.assertEqual(b.passengers, [ ])
self.assertEqual(self.count, i)
self.assertIsNone(b.timeout_id)
def test_bus_retry(self):
b = bus.Bus(10)
b.board(self.cb_retry, 1)
b.board(self.cb_retry, 2)
self.retry = True
b.run()
self.assertEqual(b.passengers, [ (self.cb_retry, (1,)), (self.cb_retry, (2,)) ])
b.complete()
self.assertEqual(b.passengers, [ ])

View File

@ -1,20 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import counter
from gi.repository import Gtk
import unittest
class TestCounter(unittest.TestCase):
def test_counter(self):
c = counter.Counter(1, 10)
self.assertIsInstance(c, Gtk.Adjustment)
self.assertEqual(c.get_lower(), 1)
self.assertEqual(c.get_upper(), 11)
self.assertEqual(c.get_value(), 1)
for i in [ 2, 3, 4, 5, 6, 7, 8, 9, 10, None ]:
self.assertEqual(c.increment(), i)
for i in [ 9, 8, 7, 6, 5, 4, 3, 2, 1, None ]:
self.assertEqual(c.decrement(), i)

View File

@ -1,34 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import metadata
import pathlib
import unittest
test_tracks = pathlib.Path("./data/Test Album")
track_01 = test_tracks / "01 - Test Track.ogg"
track_02 = test_tracks / "02 - Test {Disc 2}.ogg"
class TestMetadata(unittest.TestCase):
def test_metadata_init(self):
mdf = metadata.Metadata(track_01)
self.assertEqual(mdf.path, track_01)
self.assertIsNone(mdf.file)
def test_metadata_track_01(self):
with metadata.Metadata(track_01) as mdf:
self.assertEqual(mdf.album(), "Test Album")
self.assertEqual(mdf.artist(), "Test Artist")
self.assertEqual(mdf.artistsort(), "Artist, Test")
self.assertEqual(mdf.decade(), 2010)
self.assertEqual(mdf.discnumber(), 1)
self.assertEqual(mdf.genres(), [ "Test" ])
self.assertEqual(mdf.length(), 10)
self.assertEqual(mdf.title(), "Test Track")
self.assertEqual(mdf.tracknumber(), 1)
self.assertEqual(mdf.year(), 2019)
def test_metadata_track_02(self):
with metadata.Metadata(track_02) as mdf:
self.assertEqual(mdf.artist(), "Test Album Artist")
self.assertEqual(mdf.artistsort(), "Album Artist, Test")
self.assertEqual(mdf.genres(), [ "Test", "Genre", "List" ])
self.assertEqual(mdf.year(), 2019)

View File

@ -1,30 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
from . import publisher
import unittest
class TestPublisher(unittest.TestCase):
def on_test(self, text):
self.test_arg = text
def test_publisher_init(self):
pub = publisher.Publisher()
self.assertIsInstance(pub.subscribers, set)
self.assertEqual(pub.subscribers, set())
def test_publisher_register(self):
pub = publisher.Publisher()
pub.register(self.on_test)
self.assertEqual(pub.subscribers, { self.on_test })
pub.unregister(self.on_test)
self.assertEqual(pub.subscribers, set())
pub.subscribers = set([ 1, 2, 3 ])
pub.reset()
self.assertEqual(pub.subscribers, set())
def test_publisher_publish(self):
pub = publisher.Publisher()
pub.register(self.on_test)
pub.publish("Test Arg")
self.assertEqual(self.test_arg, "Test Arg")

View File

@ -1,256 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
from . import tag
from . import fake
import random
import threading
import unittest
class TestTag(unittest.TestCase):
def setUp(self):
self.changed = None
def callback_func(self, tag, track, pos):
self.changed = (tag, track, pos)
def test_tag_init(self):
t = tag.Tag("test")
self.assertEqual(t.name, "test")
self.assertEqual(t.sort, "test")
self.assertEqual(t.current, -1)
self.assertEqual(t.runtime, 0)
self.assertEqual(t.tracks, [ ])
self.assertFalse(t.loop)
self.assertFalse(t.random)
self.assertTrue( t.can_loop())
self.assertTrue( t.can_random())
self.assertIsNone(t.widgets)
self.assertIsInstance(t.lock, type(threading.Lock()))
self.assertIsInstance(t.TrackAdded, publisher.Publisher)
self.assertIsInstance(t.TrackRemoved, publisher.Publisher)
self.assertEqual(str(t), "test")
def test_tag_len(self):
t = tag.Tag("Test")
self.assertEqual(len(t), 0)
t.tracks = [ fake.Track(i) for i in range(5) ]
self.assertEqual(len(t), 5)
def test_tag_lt(self):
a = tag.Tag("A")
b = tag.Tag("B")
c = tag.Tag("C")
self.assertTrue( a < b)
self.assertFalse(b < a)
self.assertFalse(a < a)
a2 = tag.SuperTag(b, "A")
self.assertTrue(a < a2)
self.assertTrue(b < a2)
self.assertTrue(a2 < c)
def test_tag_header(self):
self.assertEqual(tag.Tag("Test").get_header(), "T")
self.assertEqual(tag.Tag("Test", "sort").get_header(), "S")
self.assertEqual(tag.Tag("").get_header(), "")
def test_tag_state(self):
t = tag.Tag("test")
tracks = [ fake.Track(i) for i in range(5) ]
t.tracks = tracks
state = t.__getstate__()
self.assertEqual(set(state.keys()),
set([ "name", "sort", "current", "loop", "random", "tracks" ]))
self.assertEqual(state["name"], "test")
self.assertEqual(state["sort"], "test")
self.assertEqual(state["current"], -1)
self.assertEqual(state["tracks"], [ 0, 1, 2, 3, 4 ])
self.assertFalse(state["loop"])
self.assertFalse(state["random"])
state["sort"] = "sort"
state["current"] = 42
t.__dict__.clear()
t.__setstate__(state)
self.assertEqual(t.name, "test")
self.assertEqual(t.sort, "sort")
self.assertEqual(t.current, 42)
self.assertEqual(t.runtime, 0)
self.assertEqual(t.tracks, [ 0, 1, 2, 3, 4 ])
self.assertEqual(t.widgets, None)
self.assertFalse(t.loop)
self.assertFalse(t.random)
self.assertIsInstance(t.lock, type(threading.Lock()))
self.assertIsInstance(t.TrackAdded, publisher.Publisher)
self.assertIsInstance(t.TrackRemoved, publisher.Publisher)
for track in tracks:
t.init_track(track)
self.assertEqual(t.tracks, tracks)
self.assertEqual(t.runtime, 10)
def test_tag_add_track(self):
t = tag.Tag("test")
t.TrackAdded.register(self.callback_func)
self.assertIsNone(t[0])
t.add_track(fake.Track(1))
self.assertEqual(t[0], fake.Track(1))
self.assertEqual(t.tracks, [ fake.Track(1) ])
self.assertEqual(self.changed, (t, fake.Track(1), 0))
self.assertEqual(t.runtime, 1)
t.add_track(fake.Track(2))
self.assertEqual(t[1], fake.Track(2))
self.assertEqual(t.tracks, [ fake.Track(1), fake.Track(2) ])
self.assertEqual(self.changed, (t, fake.Track(2), 1))
self.assertEqual(t.runtime, 3)
t.add_track(fake.Track(1))
self.assertEqual(t.tracks, [ fake.Track(1), fake.Track(2) ])
self.assertEqual(self.changed, (t, fake.Track(2), 1))
self.assertEqual(t.runtime, 3)
def test_tag_remove_track(self):
t = tag.Tag("test")
t.add_track(fake.Track(1))
t.add_track(fake.Track(2))
t.TrackRemoved.register(self.callback_func)
t.remove_track(fake.Track(1))
self.assertEqual(t.tracks, [ fake.Track(2) ])
self.assertEqual(self.changed, (t, fake.Track(1), 0))
self.assertEqual(t.runtime, 2)
t.remove_track(fake.Track(2))
self.assertEqual(t.tracks, [ ])
self.assertEqual(self.changed, (t, fake.Track(2), 0))
self.assertEqual(t.runtime, 0)
def test_tag_next(self):
t = tag.Tag("test")
t.tracks = [ 1, 2, 3 ]
self.assertEqual(t.next(), 1)
self.assertEqual(t.current, 0)
self.assertEqual(t.next(), 2)
self.assertEqual(t.current, 1)
self.assertEqual(t.next(), 3)
self.assertEqual(t.current, 2)
self.assertIsNone(t.next())
self.assertEqual(t.current, 3)
t.loop = True
self.assertEqual(t.next(), 1)
self.assertEqual(t.current, 0)
def test_tag_random_next(self):
t = tag.Tag("test")
t.tracks = [ 0, 1, 2, 3, 4, 5 ]
t.random = True
# Expected randint(): 5, 3, 2, 5, 5, 5
random.seed(20210318)
self.assertEqual(t.next(), 4) # -1 + 5
self.assertEqual(t.next(), 1) # (4 + 3) % 6 = 7 % 6
self.assertEqual(t.next(), 3) # 1 + 2
self.assertEqual(t.next(), 2) # (3 + 5) % 6 = 8 % 6
self.assertEqual(t.next(), 1) # (2 + 5) % 6 = 7 % 6
self.assertEqual(t.next(), 0) # (1 + 5) % 6 = 0
t.tracks = [ ]
self.assertIsNone(t.next())
t.tracks = [ 0 ]
self.assertEqual(t.next(), 0)
self.assertEqual(t.next(), 0)
t.tracks = [ 0, 1 ]
t.current = -1
self.assertEqual(t.next(), 0)
self.assertEqual(t.next(), 1)
self.assertEqual(t.next(), 0)
self.assertEqual(t.next(), 1)
def test_tag_track_selected(self):
t = tag.Tag("test")
p = tag.Tag("Previous")
t.tracks = [ fake.Track(0, p), fake.Track(1, p), fake.Track(2, p) ]
t.track_selected(fake.Track(2, p))
self.assertEqual(t.current, 2)
self.assertIn(fake.Track(2, p), p.tracks)
t.track_selected(fake.Track(1, p))
self.assertEqual(t.current, 1)
self.assertIn(fake.Track(1, p), p.tracks)
t.track_selected(fake.Track(0, p))
self.assertEqual(t.current, 0)
self.assertIn(fake.Track(0, p), p.tracks)
def test_tag_stacked(self):
t = tag.Tag("test")
t.tracks = [ 0, 1, 2 ]
t.current = 3
t.stacked()
self.assertEqual(t.current, -1)
t.current = 1
t.stacked()
self.assertEqual(t.current, 1)
class TestSuperTag(unittest.TestCase):
def test_super_tag(self):
parent = tag.Tag("parent")
st = tag.SuperTag(parent, "test", "sort")
st.tracks = [ fake.Track(i) for i in range(5) ]
self.assertIsInstance(st, tag.Tag)
self.assertEqual(st.parent, parent)
state = st.__getstate__()
self.assertEqual(state["name"], "test")
self.assertEqual(state["sort"], "sort")
self.assertEqual(state["current"], -1)
self.assertEqual(state["tracks"], [ 0, 1, 2, 3, 4 ])
self.assertEqual(state["parent"], parent)
st.__dict__.clear()
st.__setstate__(state)
self.assertEqual(st.name, "test")
self.assertEqual(st.sort, "sort")
self.assertEqual(st.tracks, [ 0, 1, 2, 3, 4 ])
self.assertEqual(st.widgets, None)
self.assertEqual(st.parent, parent)
self.assertIsInstance(st.lock, type(threading.Lock()))
def test_super_tag_lt(self):
A = tag.Tag("A")
B = tag.Tag("B")
C = tag.Tag("C")
aa = tag.SuperTag(A, "A")
ba = tag.SuperTag(B, "A")
bb = tag.SuperTag(B, "B")
ca = tag.SuperTag(C, "A")
lst = [ A, aa, B, ba, bb, C, ca ]
for i, t in enumerate(lst):
for u in lst[i+1:]:
self.assertTrue(t < u)
lst = [ ca, C, bb, ba, B, aa, A ]
for i, t in enumerate(lst):
for u in lst[i+1:]:
self.assertFalse(t < u)

View File

@ -1,167 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import fake
from . import publisher
from . import tag
from . import tagstore
import threading
import unittest
class TestTagStore(unittest.TestCase):
def setUp(self):
self.added = None
self.removed = None
def on_store_added(self, tag):
self.added = tag
def on_store_removed(self, tag):
self.removed = tag
def test_tag_store(self):
store = tagstore.TagStore()
self.assertEqual(store.store, { })
self.assertIsInstance(store.lock, type(threading.Lock()))
self.assertIsInstance(store.Added, publisher.Publisher)
self.assertIsInstance(store.Removed, publisher.Publisher)
def test_tag_store_add_remove(self):
store = tagstore.TagStore()
store.Added.register(self.on_store_added)
store.Removed.register(self.on_store_removed)
tag = store.add("test", fake.Track(1))
self.assertEqual(self.added, tag)
self.assertEqual(tag.tracks, [ fake.Track(1) ])
self.added = None
self.assertEqual(store.add("test ", fake.Track(2)), tag)
self.assertEqual(tag.tracks, [ fake.Track(1), fake.Track(2) ])
self.assertIsNone(self.added)
store.remove(tag, fake.Track(1))
self.assertEqual(tag.tracks, [ fake.Track(2) ])
self.assertIn("test", store.store.keys())
self.assertIsNone(self.removed)
store.remove(tag, fake.Track(2))
self.assertEqual(tag.tracks, [ ])
self.assertNotIn("test", store.store.keys())
self.assertEqual(self.removed, tag)
def test_tag_store_add_remove_none(self):
store = tagstore.TagStore()
tag = store.add("test")
self.assertEqual(tag.tracks, [ ])
tag.add_track(fake.Track(1))
tag.add_track(fake.Track(2))
store.remove(tag)
self.assertNotIn("test", store.store.keys())
def test_tag_store_add_remove_sort(self):
store = tagstore.TagStore()
tag = store.add("test", sort="sort")
self.assertEqual(tag.sort, "sort")
def test_tag_store_items(self):
store = tagstore.TagStore()
self.assertEqual(len(store), 0)
tag1 = store.add("test1", fake.Track(1))
tag2 = store.add("test2", fake.Track(2))
tag3 = store.add("test3", fake.Track(3))
self.assertEqual(len(store), 3)
self.assertEqual( store["test1"], tag1)
self.assertEqual( store["test2"], tag2)
self.assertEqual( store["test3"], tag3)
self.assertIsNone(store["test4"])
result = [ tag for tag in store.tags() ]
self.assertEqual(len(result), 3)
self.assertIn(tag1, result)
self.assertIn(tag2, result)
self.assertIn(tag3, result)
def test_tag_store_reset(self):
store = tagstore.TagStore()
tag = store.add("test", fake.Track(1))
store.Added.register(self.on_store_added)
store.Removed.register(self.on_store_removed)
store.reset()
self.assertNotIn(tag.name, store.store.keys())
self.assertEqual(store.Added.subscribers, set())
self.assertEqual(store.Removed.subscribers, set())
def test_tag_store_state(self):
store = tagstore.TagStore()
track = fake.Track(1)
tag = store.add("test", track)
state = store.__getstate__()
self.assertEqual(set(state.keys()), set([ "store" ]))
store.__dict__.clear()
store.__setstate__(state)
self.assertEqual(store.store, { "test" : tag })
self.assertIsInstance(store.lock, type(threading.Lock()))
self.assertIsInstance(store.Added, publisher.Publisher)
self.assertIsInstance(store.Removed, publisher.Publisher)
self.assertEqual(store.init_track("test", track), tag)
self.assertEqual(tag.tracks, [ track ])
class TestTagSuperStore(unittest.TestCase):
def test_tag_superstore(self):
store = tagstore.TagStore()
superstore = tagstore.TagSuperStore()
self.assertIsInstance(superstore, tagstore.TagStore)
parent = store.add("parent", fake.Track(1))
supertag = superstore.add(parent, "test ", fake.Track(1))
self.assertIsInstance(supertag, tag.SuperTag)
self.assertEqual(supertag.name, "test")
self.assertEqual(supertag.tracks, [ fake.Track(1) ])
self.assertEqual(supertag.parent, parent)
superstore.remove(supertag, fake.Track(1))
self.assertEqual(supertag.tracks, [ ])
self.assertNotIn("test", superstore.store.keys())
def test_tag_superstore_items(self):
store = tagstore.TagStore()
superstore = tagstore.TagSuperStore()
parent1 = store.add("parent1", fake.Track(1))
parent2 = store.add("parent2", fake.Track(2))
tag1 = superstore.add(parent1, "test1", fake.Track(1))
tag2 = superstore.add(parent1, "test2", fake.Track(2))
tag3 = superstore.add(parent2, "test3", fake.Track(3))
self.assertEqual(len(superstore), 3)
self.assertEqual( superstore[parent1, "test1"], tag1)
self.assertEqual( superstore[parent1, "test2"], tag2)
self.assertEqual( superstore[parent2, "test3"], tag3)
self.assertIsNone(superstore[parent2, "test2"])
result = [ tag for tag in superstore.tags(parent1) ]
self.assertEqual(result, [ tag1, tag2 ])
result = [ tag for tag in superstore.tags(parent2) ]
self.assertEqual(result, [ tag3 ])
result = [ tag for tag in superstore.tags() ]
self.assertEqual(result, [ tag1, tag2, tag3 ])
def test_tag_superstore_init_track(self):
store = tagstore.TagStore()
superstore = tagstore.TagSuperStore()
track = fake.Track(1)
parent = store.add("test")
tag = superstore.add(parent, "test", track)
tag.tracks = [ 1 ]
self.assertEqual(superstore.init_track(parent, "test", track), tag)
self.assertEqual(tag.tracks, [ track ])

View File

@ -1,35 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
from . import thread
import threading
import unittest
class TestThread(unittest.TestCase):
def setUp(self):
self.started = None
self.called = False
self.thread = thread.Thread(self.thread_func)
def thread_func(self):
self.assertIsNotNone(self.thread.thread)
self.called = True
def on_thread_start(self, thread):
self.started = thread
def test_thread(self):
self.assertIsInstance(thread.Start, publisher.Publisher)
thread.Start.register(self.on_thread_start)
self.assertIsInstance(self.thread.lock, type(threading.Lock()))
self.assertEqual(self.thread.func, self.thread_func)
self.assertIsNone(self.thread.thread)
self.assertFalse(self.thread.running())
self.assertEqual(self.thread(), self.thread)
self.thread.join()
self.assertTrue(self.called)
self.assertFalse(self.thread.running())
self.assertEqual(self.started, self.thread)
self.assertIsNone(self.thread.thread)

View File

@ -1,35 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
import threading
Start = publisher.Publisher()
class Thread:
def __init__(self, func):
self.func = func
self.thread = None
self.lock = threading.Lock()
def __call__(self):
with self.lock:
if self.thread:
return None
self.thread = threading.Thread(target = self.__func__)
self.thread.start()
Start.publish(self)
return self
def __func__(self):
self.func()
with self.lock:
self.thread = None
def join(self):
if self.thread:
self.thread.join()
def running(self):
with self.lock:
if self.thread:
return self.thread.is_alive()
return False