Compare commits
19 Commits
04dc67a097
...
18743f05c4
Author | SHA1 | Date |
---|---|---|
Anna Schumaker | 18743f05c4 | |
Anna Schumaker | ab6eb556ad | |
Anna Schumaker | 1296857189 | |
Anna Schumaker | 73ba296d74 | |
Anna Schumaker | f9cec5e1b3 | |
Anna Schumaker | 0c7a4a4a4c | |
Anna Schumaker | 289420e504 | |
Anna Schumaker | 15059db59a | |
Anna Schumaker | c4adea15bb | |
Anna Schumaker | 328dce0be2 | |
Anna Schumaker | 295202443f | |
Anna Schumaker | b245b2073e | |
Anna Schumaker | 7b89f54e8b | |
Anna Schumaker | b768d74928 | |
Anna Schumaker | beca08b833 | |
Anna Schumaker | db2d122211 | |
Anna Schumaker | 10c5fd4cef | |
Anna Schumaker | 2daefa932c | |
Anna Schumaker | 915e3c8340 |
2
Makefile
2
Makefile
|
@ -37,4 +37,4 @@ pkgbuild:
|
|||
.PHONY: tests
|
||||
tests:
|
||||
python tools/generate_tracks.py
|
||||
EMMENTAL_TESTING=1 python -m unittest discover -v
|
||||
python -m unittest discover -v
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
import tagdb
|
||||
from gi.repository import Gtk
|
||||
from . import artwork
|
||||
from . import controls
|
||||
|
|
|
@ -72,11 +72,11 @@ class AutoPauseScale(ScalePlus):
|
|||
return self.get_value() == 0
|
||||
|
||||
def format_value(self, scale, value):
|
||||
value = int(value)
|
||||
if value == -1: return "Keep Playing"
|
||||
elif value == 0: return "This Track"
|
||||
elif value == 1: return "Next Track"
|
||||
return f"{value} Tracks"
|
||||
match int(value):
|
||||
case -1: return "Keep Playing"
|
||||
case 0: return "This Track"
|
||||
case 1: return "Next Track"
|
||||
case _: return f"{int(value)} Tracks"
|
||||
|
||||
def decrement(self):
|
||||
self.keep_playing = not self.about_to_pause()
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
import tagdb
|
||||
from gi.repository import GObject
|
||||
|
||||
class Selector(GObject.GObject):
|
||||
def __init__(self): GObject.GObject.__init__(self)
|
||||
def next(self): return None
|
||||
def previous(self): return None
|
||||
|
||||
|
||||
class TagdbSelector(Selector):
|
||||
def next(self): return tagdb.Stack.next()[0]
|
||||
def previous(self): return tagdb.Stack.previous()
|
|
@ -1,11 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
import unittest
|
||||
from gi.repository import GObject
|
||||
from . import selector
|
||||
|
||||
class TestAudioSelector(unittest.TestCase):
|
||||
def test_audio_selector_init(self):
|
||||
select = selector.Selector()
|
||||
self.assertIsInstance(select, GObject.GObject)
|
||||
self.assertIsNone(select.next())
|
||||
self.assertIsNone(select.previous())
|
|
@ -1,3 +1,3 @@
|
|||
#!/bin/bash
|
||||
|
||||
python {EMMENTAL_LIB}/emmental.py $*
|
||||
python -O {EMMENTAL_LIB}/emmental.py $*
|
||||
|
|
|
@ -60,7 +60,10 @@ class Playlist(GObject.GObject):
|
|||
return cur.fetchone()[1] - 1
|
||||
|
||||
def track_adjusts_current(self, track):
|
||||
return self.current > -1 and self.get_track_index(track) <= self.current
|
||||
if self.current > -1:
|
||||
if (index := self.get_track_index(track)) != None:
|
||||
return index <= self.current
|
||||
return False
|
||||
|
||||
def add_track(self, track):
|
||||
self.emit("track-added", track)
|
||||
|
|
24
db/user.py
24
db/user.py
|
@ -140,17 +140,19 @@ class UserTable(playlist.Model):
|
|||
sql.execute("DROP TABLE temp_playlist_map")
|
||||
|
||||
def do_factory(self, row):
|
||||
if row["name"] == "Collection":
|
||||
return Collection(row)
|
||||
elif row["name"] == "Favorites":
|
||||
return UserPlaylist(row, "emmental-favorites", "playlist_map")
|
||||
elif row["name"] == "New Tracks":
|
||||
return UserPlaylist(row, "starred", "temp_playlist_map")
|
||||
elif row["name"] == "Previous":
|
||||
return Previous(row)
|
||||
elif row["name"] == "Queued Tracks":
|
||||
return QueuedTracks(row)
|
||||
return UserPlaylist(row, "audio-x-generic", "playlist_map")
|
||||
match row["name"]:
|
||||
case "Collection":
|
||||
return Collection(row)
|
||||
case "Favorites":
|
||||
return UserPlaylist(row, "emmental-favorites", "playlist_map")
|
||||
case "New Tracks":
|
||||
return UserPlaylist(row, "starred", "temp_playlist_map")
|
||||
case "Previous":
|
||||
return Previous(row)
|
||||
case "Queued Tracks":
|
||||
return QueuedTracks(row)
|
||||
case _:
|
||||
return UserPlaylist(row, "audio-x-generic", "playlist_map")
|
||||
|
||||
def do_insert(self, plstate, name):
|
||||
return sql.execute("INSERT INTO playlists (plstateid, name, sort) "
|
||||
|
|
29
emmental.py
29
emmental.py
|
@ -1,9 +1,32 @@
|
|||
#!/usr/bin/python
|
||||
# Copyright 2021 (c) Anna Schumaker.
|
||||
import lib
|
||||
import tagdb
|
||||
lib.settings.load()
|
||||
tagdb.load()
|
||||
|
||||
import db
|
||||
import scanner
|
||||
import ui
|
||||
ui.Application.run()
|
||||
from gi.repository import Gtk
|
||||
|
||||
class Application(Gtk.Application):
|
||||
def __init__(self, *args, **kwargs):
|
||||
app_id = f"org.gtk.emmental{'-debug' if __debug__ else ''}"
|
||||
Gtk.Application.__init__(self, *args, application_id=app_id, **kwargs)
|
||||
|
||||
def do_startup(self):
|
||||
Gtk.Application.do_startup(self)
|
||||
self.add_window(ui.window.Window())
|
||||
for i in range(db.library.Table.get_n_items()):
|
||||
scanner.update_library(db.library.Table.get_item(i))
|
||||
|
||||
def do_activate(self):
|
||||
for window in self.get_windows():
|
||||
window.present()
|
||||
|
||||
def do_shutdown(self):
|
||||
Gtk.Application.do_shutdown(self)
|
||||
scanner.Queue.clear()
|
||||
db.sql.optimize()
|
||||
|
||||
if __name__ == "__main__":
|
||||
Application().run()
|
||||
|
|
|
@ -3,11 +3,7 @@ import gi
|
|||
gi.require_version("Gtk", "4.0")
|
||||
gi.require_version("Gst", "1.0")
|
||||
|
||||
from . import bus
|
||||
from . import data
|
||||
from . import filter
|
||||
from . import publisher
|
||||
from . import settings
|
||||
from . import tag
|
||||
from . import tagstore
|
||||
from . import thread
|
||||
from . import version
|
||||
|
|
59
lib/bus.py
59
lib/bus.py
|
@ -1,59 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import publisher
|
||||
import threading
|
||||
from gi.repository import GLib
|
||||
|
||||
Start = publisher.Publisher()
|
||||
RETRY = GLib.SOURCE_CONTINUE
|
||||
|
||||
class Bus:
|
||||
def __init__(self, milliseconds):
|
||||
self.timeout = milliseconds
|
||||
self.timeout_id = None
|
||||
self.passengers = [ ]
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def __do_board(self, func, *args):
|
||||
with self.lock:
|
||||
if (func, args) not in self.passengers:
|
||||
self.passengers.append( (func, args) )
|
||||
if self.timeout_id == None:
|
||||
self.timeout_id = GLib.timeout_add(self.timeout, self.run)
|
||||
return True
|
||||
return False
|
||||
|
||||
def board(self, func, *args):
|
||||
if self.__do_board(func, *args):
|
||||
Start.publish(self)
|
||||
|
||||
def clear(self):
|
||||
with self.lock:
|
||||
if self.timeout_id:
|
||||
GLib.source_remove(self.timeout_id)
|
||||
self.timeout_id = None
|
||||
self.passengers.clear()
|
||||
|
||||
def complete(self):
|
||||
with self.lock:
|
||||
for (func, args) in self.passengers:
|
||||
func(*args)
|
||||
|
||||
GLib.source_remove(self.timeout_id)
|
||||
self.timeout_id = None
|
||||
self.passengers.clear()
|
||||
|
||||
def running(self):
|
||||
with self.lock:
|
||||
return self.timeout_id != None
|
||||
|
||||
def run(self):
|
||||
with self.lock:
|
||||
(func, args) = self.passengers[0]
|
||||
if func(*args) == RETRY:
|
||||
return GLib.SOURCE_CONTINUE
|
||||
|
||||
self.passengers.pop(0)
|
||||
if len(self.passengers) == 0:
|
||||
self.timeout_id = None
|
||||
return GLib.SOURCE_REMOVE
|
||||
return GLib.SOURCE_CONTINUE
|
|
@ -1,22 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from gi.repository import Gtk
|
||||
|
||||
class Counter(Gtk.Adjustment):
|
||||
def __init__(self, min, max):
|
||||
Gtk.Adjustment.__init__(self)
|
||||
self.configure(value=min, lower=min, upper=max + 1, step_increment=1,
|
||||
page_increment=1, page_size=1)
|
||||
|
||||
def __change_value__(self, n):
|
||||
value = self.get_value()
|
||||
self.set_value(value + n)
|
||||
|
||||
if self.get_value() == value:
|
||||
return None
|
||||
return self.get_value()
|
||||
|
||||
def increment(self):
|
||||
return self.__change_value__(1)
|
||||
|
||||
def decrement(self):
|
||||
return self.__change_value__(-1)
|
|
@ -7,7 +7,7 @@ import xdg.BaseDirectory
|
|||
__resource = "emmental"
|
||||
if version.TESTING == True:
|
||||
__resource = "emmental-testing"
|
||||
elif version.DEBUG == True:
|
||||
elif __debug__ == True:
|
||||
__resource = "emmental-debug"
|
||||
|
||||
READ = 'rb'
|
||||
|
|
24
lib/fake.py
24
lib/fake.py
|
@ -1,24 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from gi.repository import GObject
|
||||
|
||||
tracks = { }
|
||||
|
||||
class FakeTrack(GObject.GObject):
|
||||
def __init__(self, n, tag=None):
|
||||
GObject.GObject.__init__(self)
|
||||
self.trackid = n
|
||||
self.length = n
|
||||
self.tag = tag
|
||||
|
||||
def __int__(self):
|
||||
return self.trackid
|
||||
|
||||
def add_to_playlist(self, name):
|
||||
self.tag.add_track(self)
|
||||
|
||||
def remove_from_playlist(self, name):
|
||||
self.tag.remove_track(self)
|
||||
|
||||
|
||||
def Track(n, tag=None):
|
||||
return tracks.setdefault((n,tag), FakeTrack(n, tag=tag))
|
|
@ -1,51 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
import mutagen
|
||||
import re
|
||||
|
||||
class Metadata:
|
||||
def __init__(self, filepath):
|
||||
self.path = filepath
|
||||
self.file = None
|
||||
|
||||
def __enter__(self):
|
||||
self.file = mutagen.File(self.path)
|
||||
return self
|
||||
|
||||
def __exit__(self, exp_type, exp_value, traceback):
|
||||
self.file = None
|
||||
return exp_type == None
|
||||
|
||||
def album(self):
|
||||
return self.file.get("album", [ "Unknown Album" ])[0]
|
||||
|
||||
def artist(self):
|
||||
artist = self.file.get("artist", [ "Unknown Artist" ])
|
||||
return self.file.get("albumartist", artist)[0]
|
||||
|
||||
def artistsort(self):
|
||||
sort = self.file.get("artistsort", [ None ])
|
||||
return self.file.get("albumartistsort", sort)[0]
|
||||
|
||||
def decade(self):
|
||||
return (self.year() // 10) * 10
|
||||
|
||||
def discnumber(self):
|
||||
return int(self.file.get("discnumber", [ 1 ])[0])
|
||||
|
||||
def genres(self):
|
||||
genre = self.file.get("genre", [ "" ])[0]
|
||||
return [ g.strip() for g in re.split(",|;|/|:", genre) ]
|
||||
|
||||
def length(self):
|
||||
return int(self.file.info.length)
|
||||
|
||||
def title(self):
|
||||
return self.file.get("title", [ "" ])[0]
|
||||
|
||||
def tracknumber(self):
|
||||
return int(self.file.get("tracknumber", [ 0 ])[0])
|
||||
|
||||
def year(self):
|
||||
year = self.file.get("date", [ "0" ])
|
||||
year = self.file.get("originalyear", year)[0]
|
||||
return int(re.match("\d+", year).group(0))
|
|
@ -1,19 +0,0 @@
|
|||
# Copyright 2020 (c) Anna Schumaker.
|
||||
|
||||
class Publisher:
|
||||
def __init__(self):
|
||||
self.subscribers = set()
|
||||
|
||||
def publish(self, *args):
|
||||
funcs = self.subscribers.copy()
|
||||
for func in funcs:
|
||||
func(*args)
|
||||
|
||||
def register(self, func):
|
||||
self.subscribers.add(func)
|
||||
|
||||
def reset(self):
|
||||
self.subscribers.clear()
|
||||
|
||||
def unregister(self, func):
|
||||
self.subscribers.discard(func)
|
146
lib/tag.py
146
lib/tag.py
|
@ -1,146 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import publisher
|
||||
import random
|
||||
import threading
|
||||
|
||||
class Tag:
|
||||
def __init__(self, name, sort=None):
|
||||
self.name = name
|
||||
self.sort = sort.lower() if sort else str(name).lower()
|
||||
self.current = -1
|
||||
self.runtime = 0
|
||||
self.loop = False
|
||||
self.random = False
|
||||
self.tracks = [ ]
|
||||
self.widgets = None
|
||||
self.lock = threading.Lock()
|
||||
self.TrackAdded = publisher.Publisher()
|
||||
self.TrackRemoved = publisher.Publisher()
|
||||
|
||||
def __getitem__(self, n):
|
||||
with self.lock:
|
||||
if n < len(self.tracks):
|
||||
return self.tracks[n]
|
||||
return None
|
||||
|
||||
def __getstate__(self):
|
||||
with self.lock:
|
||||
return { "name" : self.name,
|
||||
"sort" : self.sort,
|
||||
"current" : self.current,
|
||||
"loop" : self.loop,
|
||||
"random" : self.random,
|
||||
"tracks" : [ t.trackid for t in self.tracks ] }
|
||||
|
||||
def __len__(self):
|
||||
with self.lock:
|
||||
return len(self.tracks)
|
||||
|
||||
def __lt__(self, rhs):
|
||||
if not isinstance(rhs, SuperTag):
|
||||
return self.sort < rhs.sort
|
||||
if self == rhs.parent:
|
||||
return True
|
||||
return self.sort < rhs.parent.sort
|
||||
|
||||
def __next_track__(self):
|
||||
if self.loop == True and self.current >= len(self.tracks):
|
||||
return 0
|
||||
return self.current + 1
|
||||
|
||||
def __random_track__(self):
|
||||
i = 1
|
||||
length = len(self.tracks)
|
||||
if len(self.tracks) >= 3:
|
||||
i = random.randint(1, length - 1)
|
||||
return (self.current + i) % (1 if length == 0 else length)
|
||||
|
||||
def __setstate__(self, state):
|
||||
self.name = state["name"]
|
||||
self.sort = state["sort"]
|
||||
self.current = state["current"]
|
||||
self.loop = state["loop"]
|
||||
self.random = state["random"]
|
||||
self.tracks = state["tracks"]
|
||||
self.runtime = 0
|
||||
self.widgets = None
|
||||
self.lock = threading.Lock()
|
||||
self.TrackAdded = publisher.Publisher()
|
||||
self.TrackRemoved = publisher.Publisher()
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
def add_track(self, track):
|
||||
with self.lock:
|
||||
if track in self.tracks:
|
||||
return
|
||||
pos = len(self.tracks)
|
||||
self.tracks.append(track)
|
||||
self.runtime += track.length
|
||||
self.TrackAdded.publish(self, track, pos)
|
||||
|
||||
def can_loop(self): return True
|
||||
def can_random(self): return True
|
||||
|
||||
def get_header(self):
|
||||
return self.sort[0].upper() if len(self.sort) > 0 else ""
|
||||
|
||||
def init_track(self, track):
|
||||
with self.lock:
|
||||
try:
|
||||
i = self.tracks.index(track.trackid)
|
||||
self.tracks[i] = track
|
||||
self.runtime += track.length
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
def next(self):
|
||||
with self.lock:
|
||||
if self.random == True:
|
||||
self.current = self.__random_track__()
|
||||
else:
|
||||
self.current = self.__next_track__()
|
||||
|
||||
if self.current < len(self.tracks):
|
||||
return self.tracks[self.current]
|
||||
return None
|
||||
|
||||
def remove_track(self, track):
|
||||
with self.lock:
|
||||
pos = self.tracks.index(track)
|
||||
self.tracks.remove(track)
|
||||
self.runtime -= track.length
|
||||
self.TrackRemoved.publish(self, track, pos)
|
||||
|
||||
def stacked(self):
|
||||
with self.lock:
|
||||
if self.current >= len(self.tracks):
|
||||
self.current = -1
|
||||
|
||||
def track_selected(self, track):
|
||||
with self.lock:
|
||||
self.current = self.tracks.index(track)
|
||||
track.add_to_playlist("Previous")
|
||||
|
||||
|
||||
class SuperTag(Tag):
|
||||
def __init__(self, parent, name, sort=None):
|
||||
Tag.__init__(self, name, sort)
|
||||
self.parent = parent
|
||||
|
||||
def __getstate__(self):
|
||||
state = Tag.__getstate__(self)
|
||||
state["parent"] = self.parent
|
||||
return state
|
||||
|
||||
def __lt__(self, rhs):
|
||||
if not isinstance(rhs, SuperTag):
|
||||
return self.parent.sort < rhs.sort
|
||||
if self.parent != rhs.parent:
|
||||
return self.parent < rhs.parent
|
||||
return self.sort < rhs.sort
|
||||
|
||||
def __setstate__(self, state):
|
||||
Tag.__setstate__(self, state)
|
||||
self.parent = state["parent"]
|
|
@ -1,98 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import publisher
|
||||
from . import tag
|
||||
import threading
|
||||
|
||||
class TagStore:
|
||||
def __init__(self):
|
||||
self.store = dict()
|
||||
self.lock = threading.Lock()
|
||||
self.Added = publisher.Publisher()
|
||||
self.Removed = publisher.Publisher()
|
||||
|
||||
def __add_tag__(self, name, sort, track):
|
||||
t = self.__get_tag__(name, sort)
|
||||
if track:
|
||||
t.add_track(track)
|
||||
return t
|
||||
|
||||
def __alloc_tag__(self, name, sort):
|
||||
return tag.Tag(name, sort)
|
||||
|
||||
def __getitem__(self, name):
|
||||
with self.lock:
|
||||
return self.store.get(name)
|
||||
|
||||
def __getstate__(self):
|
||||
with self.lock:
|
||||
return { "store" : self.store }
|
||||
|
||||
def __get_tag__(self, name, sort):
|
||||
with self.lock:
|
||||
if (t := self.store.get(name)) != None:
|
||||
return t
|
||||
t = self.__alloc_tag__(name, sort)
|
||||
self.store[name] = t
|
||||
self.Added.publish(t)
|
||||
return t
|
||||
|
||||
def __len__(self):
|
||||
with self.lock:
|
||||
return len(self.store)
|
||||
|
||||
def __pop_tag__(self, t):
|
||||
self.store.pop(t.name)
|
||||
|
||||
def __setstate__(self, state):
|
||||
self.store = state["store"]
|
||||
self.lock = threading.Lock()
|
||||
self.Added = publisher.Publisher()
|
||||
self.Removed = publisher.Publisher()
|
||||
|
||||
def add(self, name, track=None, sort=None):
|
||||
return self.__add_tag__(name.strip(), sort, track)
|
||||
|
||||
def init_track(self, name, track):
|
||||
with self.lock:
|
||||
if (t := self.store.get(name)) != None:
|
||||
t.init_track(track)
|
||||
return t
|
||||
|
||||
def remove(self, t, track=None):
|
||||
if track:
|
||||
t.remove_track(track)
|
||||
if (track == None or len(t) == 0) and t in self.store.values():
|
||||
with self.lock:
|
||||
self.__pop_tag__(t)
|
||||
self.Removed.publish(t)
|
||||
|
||||
def reset(self):
|
||||
with self.lock:
|
||||
self.store.clear()
|
||||
self.Added.reset()
|
||||
self.Removed.reset()
|
||||
|
||||
def tags(self):
|
||||
with self.lock:
|
||||
for (name, tag) in self.store.items():
|
||||
yield tag
|
||||
|
||||
|
||||
class TagSuperStore(TagStore):
|
||||
def __alloc_tag__(self, key, sort):
|
||||
return tag.SuperTag(key[0], key[1], sort)
|
||||
|
||||
def __pop_tag__(self, t):
|
||||
self.store.pop((t.parent, t.name))
|
||||
|
||||
def add(self, parent, name, track, sort=None):
|
||||
return super().__add_tag__((parent, name.strip()), sort, track)
|
||||
|
||||
def init_track(self, parent, name, track):
|
||||
return super().init_track((parent, name), track)
|
||||
|
||||
def tags(self, parent=None):
|
||||
with self.lock:
|
||||
for (name, tag) in self.store.items():
|
||||
if parent == None or tag.parent == parent:
|
||||
yield tag
|
|
@ -1,93 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import bus
|
||||
from . import publisher
|
||||
from gi.repository import GLib
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
|
||||
main_context = GLib.main_context_default()
|
||||
|
||||
class TestBus(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.count = 0
|
||||
self.start = 0
|
||||
self.retry = False
|
||||
bus.Start.register(self.cb_start)
|
||||
|
||||
def tearDown(self):
|
||||
bus.Start.unregister(self.cb_start)
|
||||
|
||||
def cb_start(self, bus):
|
||||
self.start += 1
|
||||
|
||||
def cb_one(self):
|
||||
self.count += 1
|
||||
|
||||
def cb_two(self, arg):
|
||||
self.count = arg
|
||||
|
||||
def cb_retry(self, arg):
|
||||
self.count = arg
|
||||
return bus.RETRY if self.retry == True else None
|
||||
|
||||
def test_bus_init(self):
|
||||
self.assertEqual(bus.RETRY, GLib.SOURCE_CONTINUE)
|
||||
|
||||
b = bus.Bus(100)
|
||||
self.assertEqual(b.timeout, 100)
|
||||
self.assertEqual(b.passengers, [ ])
|
||||
self.assertIsNone(b.timeout_id)
|
||||
self.assertIsInstance(b.lock, type(threading.Lock()))
|
||||
|
||||
self.assertIsInstance(bus.Start, publisher.Publisher)
|
||||
|
||||
def test_bus_board(self):
|
||||
b = bus.Bus(100)
|
||||
b.board(self.cb_one)
|
||||
b.board(self.cb_one)
|
||||
|
||||
self.assertEqual(b.passengers, [ (self.cb_one,()) ])
|
||||
self.assertIsNotNone(b.timeout_id)
|
||||
self.assertTrue(b.running())
|
||||
self.assertEqual(self.start, 1)
|
||||
|
||||
time.sleep(0.1)
|
||||
while main_context.iteration(may_block=False): pass
|
||||
|
||||
self.assertEqual(self.count, 1)
|
||||
self.assertIsNone(b.timeout_id)
|
||||
self.assertFalse(b.running())
|
||||
|
||||
def test_bus_clear(self):
|
||||
b = bus.Bus(100)
|
||||
b.clear()
|
||||
|
||||
for i in range(100):
|
||||
b.board(self.cb_one)
|
||||
|
||||
b.clear()
|
||||
self.assertEqual(b.passengers, [ ])
|
||||
self.assertIsNone(b.timeout_id)
|
||||
|
||||
def test_bus_complete(self):
|
||||
b = bus.Bus(100)
|
||||
for i in range(100):
|
||||
b.board(self.cb_two, i)
|
||||
|
||||
b.complete()
|
||||
self.assertEqual(b.passengers, [ ])
|
||||
self.assertEqual(self.count, i)
|
||||
self.assertIsNone(b.timeout_id)
|
||||
|
||||
def test_bus_retry(self):
|
||||
b = bus.Bus(10)
|
||||
b.board(self.cb_retry, 1)
|
||||
b.board(self.cb_retry, 2)
|
||||
|
||||
self.retry = True
|
||||
b.run()
|
||||
self.assertEqual(b.passengers, [ (self.cb_retry, (1,)), (self.cb_retry, (2,)) ])
|
||||
|
||||
b.complete()
|
||||
self.assertEqual(b.passengers, [ ])
|
|
@ -1,20 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import counter
|
||||
from gi.repository import Gtk
|
||||
import unittest
|
||||
|
||||
class TestCounter(unittest.TestCase):
|
||||
def test_counter(self):
|
||||
c = counter.Counter(1, 10)
|
||||
|
||||
self.assertIsInstance(c, Gtk.Adjustment)
|
||||
|
||||
self.assertEqual(c.get_lower(), 1)
|
||||
self.assertEqual(c.get_upper(), 11)
|
||||
self.assertEqual(c.get_value(), 1)
|
||||
|
||||
for i in [ 2, 3, 4, 5, 6, 7, 8, 9, 10, None ]:
|
||||
self.assertEqual(c.increment(), i)
|
||||
|
||||
for i in [ 9, 8, 7, 6, 5, 4, 3, 2, 1, None ]:
|
||||
self.assertEqual(c.decrement(), i)
|
|
@ -1,34 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import metadata
|
||||
import pathlib
|
||||
import unittest
|
||||
|
||||
test_tracks = pathlib.Path("./data/Test Album")
|
||||
track_01 = test_tracks / "01 - Test Track.ogg"
|
||||
track_02 = test_tracks / "02 - Test {Disc 2}.ogg"
|
||||
|
||||
class TestMetadata(unittest.TestCase):
|
||||
def test_metadata_init(self):
|
||||
mdf = metadata.Metadata(track_01)
|
||||
self.assertEqual(mdf.path, track_01)
|
||||
self.assertIsNone(mdf.file)
|
||||
|
||||
def test_metadata_track_01(self):
|
||||
with metadata.Metadata(track_01) as mdf:
|
||||
self.assertEqual(mdf.album(), "Test Album")
|
||||
self.assertEqual(mdf.artist(), "Test Artist")
|
||||
self.assertEqual(mdf.artistsort(), "Artist, Test")
|
||||
self.assertEqual(mdf.decade(), 2010)
|
||||
self.assertEqual(mdf.discnumber(), 1)
|
||||
self.assertEqual(mdf.genres(), [ "Test" ])
|
||||
self.assertEqual(mdf.length(), 10)
|
||||
self.assertEqual(mdf.title(), "Test Track")
|
||||
self.assertEqual(mdf.tracknumber(), 1)
|
||||
self.assertEqual(mdf.year(), 2019)
|
||||
|
||||
def test_metadata_track_02(self):
|
||||
with metadata.Metadata(track_02) as mdf:
|
||||
self.assertEqual(mdf.artist(), "Test Album Artist")
|
||||
self.assertEqual(mdf.artistsort(), "Album Artist, Test")
|
||||
self.assertEqual(mdf.genres(), [ "Test", "Genre", "List" ])
|
||||
self.assertEqual(mdf.year(), 2019)
|
|
@ -1,30 +0,0 @@
|
|||
# Copyright 2020 (c) Anna Schumaker.
|
||||
from . import publisher
|
||||
import unittest
|
||||
|
||||
class TestPublisher(unittest.TestCase):
|
||||
def on_test(self, text):
|
||||
self.test_arg = text
|
||||
|
||||
def test_publisher_init(self):
|
||||
pub = publisher.Publisher()
|
||||
self.assertIsInstance(pub.subscribers, set)
|
||||
self.assertEqual(pub.subscribers, set())
|
||||
|
||||
def test_publisher_register(self):
|
||||
pub = publisher.Publisher()
|
||||
pub.register(self.on_test)
|
||||
self.assertEqual(pub.subscribers, { self.on_test })
|
||||
|
||||
pub.unregister(self.on_test)
|
||||
self.assertEqual(pub.subscribers, set())
|
||||
|
||||
pub.subscribers = set([ 1, 2, 3 ])
|
||||
pub.reset()
|
||||
self.assertEqual(pub.subscribers, set())
|
||||
|
||||
def test_publisher_publish(self):
|
||||
pub = publisher.Publisher()
|
||||
pub.register(self.on_test)
|
||||
pub.publish("Test Arg")
|
||||
self.assertEqual(self.test_arg, "Test Arg")
|
256
lib/test_tag.py
256
lib/test_tag.py
|
@ -1,256 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import publisher
|
||||
from . import tag
|
||||
from . import fake
|
||||
import random
|
||||
import threading
|
||||
import unittest
|
||||
|
||||
|
||||
class TestTag(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.changed = None
|
||||
|
||||
def callback_func(self, tag, track, pos):
|
||||
self.changed = (tag, track, pos)
|
||||
|
||||
def test_tag_init(self):
|
||||
t = tag.Tag("test")
|
||||
self.assertEqual(t.name, "test")
|
||||
self.assertEqual(t.sort, "test")
|
||||
self.assertEqual(t.current, -1)
|
||||
self.assertEqual(t.runtime, 0)
|
||||
self.assertEqual(t.tracks, [ ])
|
||||
self.assertFalse(t.loop)
|
||||
self.assertFalse(t.random)
|
||||
self.assertTrue( t.can_loop())
|
||||
self.assertTrue( t.can_random())
|
||||
self.assertIsNone(t.widgets)
|
||||
self.assertIsInstance(t.lock, type(threading.Lock()))
|
||||
self.assertIsInstance(t.TrackAdded, publisher.Publisher)
|
||||
self.assertIsInstance(t.TrackRemoved, publisher.Publisher)
|
||||
|
||||
self.assertEqual(str(t), "test")
|
||||
|
||||
def test_tag_len(self):
|
||||
t = tag.Tag("Test")
|
||||
self.assertEqual(len(t), 0)
|
||||
t.tracks = [ fake.Track(i) for i in range(5) ]
|
||||
self.assertEqual(len(t), 5)
|
||||
|
||||
def test_tag_lt(self):
|
||||
a = tag.Tag("A")
|
||||
b = tag.Tag("B")
|
||||
c = tag.Tag("C")
|
||||
|
||||
self.assertTrue( a < b)
|
||||
self.assertFalse(b < a)
|
||||
self.assertFalse(a < a)
|
||||
|
||||
a2 = tag.SuperTag(b, "A")
|
||||
self.assertTrue(a < a2)
|
||||
self.assertTrue(b < a2)
|
||||
self.assertTrue(a2 < c)
|
||||
|
||||
def test_tag_header(self):
|
||||
self.assertEqual(tag.Tag("Test").get_header(), "T")
|
||||
self.assertEqual(tag.Tag("Test", "sort").get_header(), "S")
|
||||
self.assertEqual(tag.Tag("").get_header(), "")
|
||||
|
||||
def test_tag_state(self):
|
||||
t = tag.Tag("test")
|
||||
tracks = [ fake.Track(i) for i in range(5) ]
|
||||
t.tracks = tracks
|
||||
|
||||
state = t.__getstate__()
|
||||
self.assertEqual(set(state.keys()),
|
||||
set([ "name", "sort", "current", "loop", "random", "tracks" ]))
|
||||
self.assertEqual(state["name"], "test")
|
||||
self.assertEqual(state["sort"], "test")
|
||||
self.assertEqual(state["current"], -1)
|
||||
self.assertEqual(state["tracks"], [ 0, 1, 2, 3, 4 ])
|
||||
self.assertFalse(state["loop"])
|
||||
self.assertFalse(state["random"])
|
||||
|
||||
state["sort"] = "sort"
|
||||
state["current"] = 42
|
||||
|
||||
t.__dict__.clear()
|
||||
t.__setstate__(state)
|
||||
self.assertEqual(t.name, "test")
|
||||
self.assertEqual(t.sort, "sort")
|
||||
self.assertEqual(t.current, 42)
|
||||
self.assertEqual(t.runtime, 0)
|
||||
self.assertEqual(t.tracks, [ 0, 1, 2, 3, 4 ])
|
||||
self.assertEqual(t.widgets, None)
|
||||
self.assertFalse(t.loop)
|
||||
self.assertFalse(t.random)
|
||||
self.assertIsInstance(t.lock, type(threading.Lock()))
|
||||
self.assertIsInstance(t.TrackAdded, publisher.Publisher)
|
||||
self.assertIsInstance(t.TrackRemoved, publisher.Publisher)
|
||||
|
||||
for track in tracks:
|
||||
t.init_track(track)
|
||||
self.assertEqual(t.tracks, tracks)
|
||||
self.assertEqual(t.runtime, 10)
|
||||
|
||||
def test_tag_add_track(self):
|
||||
t = tag.Tag("test")
|
||||
t.TrackAdded.register(self.callback_func)
|
||||
|
||||
self.assertIsNone(t[0])
|
||||
t.add_track(fake.Track(1))
|
||||
self.assertEqual(t[0], fake.Track(1))
|
||||
self.assertEqual(t.tracks, [ fake.Track(1) ])
|
||||
self.assertEqual(self.changed, (t, fake.Track(1), 0))
|
||||
self.assertEqual(t.runtime, 1)
|
||||
|
||||
t.add_track(fake.Track(2))
|
||||
self.assertEqual(t[1], fake.Track(2))
|
||||
self.assertEqual(t.tracks, [ fake.Track(1), fake.Track(2) ])
|
||||
self.assertEqual(self.changed, (t, fake.Track(2), 1))
|
||||
self.assertEqual(t.runtime, 3)
|
||||
|
||||
t.add_track(fake.Track(1))
|
||||
self.assertEqual(t.tracks, [ fake.Track(1), fake.Track(2) ])
|
||||
self.assertEqual(self.changed, (t, fake.Track(2), 1))
|
||||
self.assertEqual(t.runtime, 3)
|
||||
|
||||
def test_tag_remove_track(self):
|
||||
t = tag.Tag("test")
|
||||
t.add_track(fake.Track(1))
|
||||
t.add_track(fake.Track(2))
|
||||
t.TrackRemoved.register(self.callback_func)
|
||||
|
||||
t.remove_track(fake.Track(1))
|
||||
self.assertEqual(t.tracks, [ fake.Track(2) ])
|
||||
self.assertEqual(self.changed, (t, fake.Track(1), 0))
|
||||
self.assertEqual(t.runtime, 2)
|
||||
|
||||
t.remove_track(fake.Track(2))
|
||||
self.assertEqual(t.tracks, [ ])
|
||||
self.assertEqual(self.changed, (t, fake.Track(2), 0))
|
||||
self.assertEqual(t.runtime, 0)
|
||||
|
||||
def test_tag_next(self):
|
||||
t = tag.Tag("test")
|
||||
t.tracks = [ 1, 2, 3 ]
|
||||
|
||||
self.assertEqual(t.next(), 1)
|
||||
self.assertEqual(t.current, 0)
|
||||
|
||||
self.assertEqual(t.next(), 2)
|
||||
self.assertEqual(t.current, 1)
|
||||
|
||||
self.assertEqual(t.next(), 3)
|
||||
self.assertEqual(t.current, 2)
|
||||
|
||||
self.assertIsNone(t.next())
|
||||
self.assertEqual(t.current, 3)
|
||||
|
||||
t.loop = True
|
||||
self.assertEqual(t.next(), 1)
|
||||
self.assertEqual(t.current, 0)
|
||||
|
||||
def test_tag_random_next(self):
|
||||
t = tag.Tag("test")
|
||||
t.tracks = [ 0, 1, 2, 3, 4, 5 ]
|
||||
t.random = True
|
||||
|
||||
# Expected randint(): 5, 3, 2, 5, 5, 5
|
||||
random.seed(20210318)
|
||||
self.assertEqual(t.next(), 4) # -1 + 5
|
||||
self.assertEqual(t.next(), 1) # (4 + 3) % 6 = 7 % 6
|
||||
self.assertEqual(t.next(), 3) # 1 + 2
|
||||
self.assertEqual(t.next(), 2) # (3 + 5) % 6 = 8 % 6
|
||||
self.assertEqual(t.next(), 1) # (2 + 5) % 6 = 7 % 6
|
||||
self.assertEqual(t.next(), 0) # (1 + 5) % 6 = 0
|
||||
|
||||
t.tracks = [ ]
|
||||
self.assertIsNone(t.next())
|
||||
|
||||
t.tracks = [ 0 ]
|
||||
self.assertEqual(t.next(), 0)
|
||||
self.assertEqual(t.next(), 0)
|
||||
|
||||
t.tracks = [ 0, 1 ]
|
||||
t.current = -1
|
||||
self.assertEqual(t.next(), 0)
|
||||
self.assertEqual(t.next(), 1)
|
||||
self.assertEqual(t.next(), 0)
|
||||
self.assertEqual(t.next(), 1)
|
||||
|
||||
def test_tag_track_selected(self):
|
||||
t = tag.Tag("test")
|
||||
p = tag.Tag("Previous")
|
||||
t.tracks = [ fake.Track(0, p), fake.Track(1, p), fake.Track(2, p) ]
|
||||
|
||||
t.track_selected(fake.Track(2, p))
|
||||
self.assertEqual(t.current, 2)
|
||||
self.assertIn(fake.Track(2, p), p.tracks)
|
||||
|
||||
t.track_selected(fake.Track(1, p))
|
||||
self.assertEqual(t.current, 1)
|
||||
self.assertIn(fake.Track(1, p), p.tracks)
|
||||
|
||||
t.track_selected(fake.Track(0, p))
|
||||
self.assertEqual(t.current, 0)
|
||||
self.assertIn(fake.Track(0, p), p.tracks)
|
||||
|
||||
def test_tag_stacked(self):
|
||||
t = tag.Tag("test")
|
||||
t.tracks = [ 0, 1, 2 ]
|
||||
|
||||
t.current = 3
|
||||
t.stacked()
|
||||
self.assertEqual(t.current, -1)
|
||||
|
||||
t.current = 1
|
||||
t.stacked()
|
||||
self.assertEqual(t.current, 1)
|
||||
|
||||
|
||||
class TestSuperTag(unittest.TestCase):
|
||||
def test_super_tag(self):
|
||||
parent = tag.Tag("parent")
|
||||
st = tag.SuperTag(parent, "test", "sort")
|
||||
st.tracks = [ fake.Track(i) for i in range(5) ]
|
||||
|
||||
self.assertIsInstance(st, tag.Tag)
|
||||
self.assertEqual(st.parent, parent)
|
||||
|
||||
state = st.__getstate__()
|
||||
self.assertEqual(state["name"], "test")
|
||||
self.assertEqual(state["sort"], "sort")
|
||||
self.assertEqual(state["current"], -1)
|
||||
self.assertEqual(state["tracks"], [ 0, 1, 2, 3, 4 ])
|
||||
self.assertEqual(state["parent"], parent)
|
||||
|
||||
st.__dict__.clear()
|
||||
st.__setstate__(state)
|
||||
self.assertEqual(st.name, "test")
|
||||
self.assertEqual(st.sort, "sort")
|
||||
self.assertEqual(st.tracks, [ 0, 1, 2, 3, 4 ])
|
||||
self.assertEqual(st.widgets, None)
|
||||
self.assertEqual(st.parent, parent)
|
||||
self.assertIsInstance(st.lock, type(threading.Lock()))
|
||||
|
||||
def test_super_tag_lt(self):
|
||||
A = tag.Tag("A")
|
||||
B = tag.Tag("B")
|
||||
C = tag.Tag("C")
|
||||
|
||||
aa = tag.SuperTag(A, "A")
|
||||
ba = tag.SuperTag(B, "A")
|
||||
bb = tag.SuperTag(B, "B")
|
||||
ca = tag.SuperTag(C, "A")
|
||||
|
||||
lst = [ A, aa, B, ba, bb, C, ca ]
|
||||
for i, t in enumerate(lst):
|
||||
for u in lst[i+1:]:
|
||||
self.assertTrue(t < u)
|
||||
|
||||
lst = [ ca, C, bb, ba, B, aa, A ]
|
||||
for i, t in enumerate(lst):
|
||||
for u in lst[i+1:]:
|
||||
self.assertFalse(t < u)
|
|
@ -1,167 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import fake
|
||||
from . import publisher
|
||||
from . import tag
|
||||
from . import tagstore
|
||||
import threading
|
||||
import unittest
|
||||
|
||||
class TestTagStore(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.added = None
|
||||
self.removed = None
|
||||
|
||||
def on_store_added(self, tag):
|
||||
self.added = tag
|
||||
|
||||
def on_store_removed(self, tag):
|
||||
self.removed = tag
|
||||
|
||||
def test_tag_store(self):
|
||||
store = tagstore.TagStore()
|
||||
self.assertEqual(store.store, { })
|
||||
self.assertIsInstance(store.lock, type(threading.Lock()))
|
||||
self.assertIsInstance(store.Added, publisher.Publisher)
|
||||
self.assertIsInstance(store.Removed, publisher.Publisher)
|
||||
|
||||
def test_tag_store_add_remove(self):
|
||||
store = tagstore.TagStore()
|
||||
store.Added.register(self.on_store_added)
|
||||
store.Removed.register(self.on_store_removed)
|
||||
|
||||
tag = store.add("test", fake.Track(1))
|
||||
self.assertEqual(self.added, tag)
|
||||
self.assertEqual(tag.tracks, [ fake.Track(1) ])
|
||||
|
||||
self.added = None
|
||||
self.assertEqual(store.add("test ", fake.Track(2)), tag)
|
||||
self.assertEqual(tag.tracks, [ fake.Track(1), fake.Track(2) ])
|
||||
self.assertIsNone(self.added)
|
||||
|
||||
store.remove(tag, fake.Track(1))
|
||||
self.assertEqual(tag.tracks, [ fake.Track(2) ])
|
||||
self.assertIn("test", store.store.keys())
|
||||
self.assertIsNone(self.removed)
|
||||
|
||||
store.remove(tag, fake.Track(2))
|
||||
self.assertEqual(tag.tracks, [ ])
|
||||
self.assertNotIn("test", store.store.keys())
|
||||
self.assertEqual(self.removed, tag)
|
||||
|
||||
def test_tag_store_add_remove_none(self):
|
||||
store = tagstore.TagStore()
|
||||
tag = store.add("test")
|
||||
self.assertEqual(tag.tracks, [ ])
|
||||
|
||||
tag.add_track(fake.Track(1))
|
||||
tag.add_track(fake.Track(2))
|
||||
|
||||
store.remove(tag)
|
||||
self.assertNotIn("test", store.store.keys())
|
||||
|
||||
def test_tag_store_add_remove_sort(self):
|
||||
store = tagstore.TagStore()
|
||||
tag = store.add("test", sort="sort")
|
||||
self.assertEqual(tag.sort, "sort")
|
||||
|
||||
def test_tag_store_items(self):
|
||||
store = tagstore.TagStore()
|
||||
self.assertEqual(len(store), 0)
|
||||
|
||||
tag1 = store.add("test1", fake.Track(1))
|
||||
tag2 = store.add("test2", fake.Track(2))
|
||||
tag3 = store.add("test3", fake.Track(3))
|
||||
|
||||
self.assertEqual(len(store), 3)
|
||||
self.assertEqual( store["test1"], tag1)
|
||||
self.assertEqual( store["test2"], tag2)
|
||||
self.assertEqual( store["test3"], tag3)
|
||||
self.assertIsNone(store["test4"])
|
||||
|
||||
result = [ tag for tag in store.tags() ]
|
||||
self.assertEqual(len(result), 3)
|
||||
self.assertIn(tag1, result)
|
||||
self.assertIn(tag2, result)
|
||||
self.assertIn(tag3, result)
|
||||
|
||||
def test_tag_store_reset(self):
|
||||
store = tagstore.TagStore()
|
||||
tag = store.add("test", fake.Track(1))
|
||||
store.Added.register(self.on_store_added)
|
||||
store.Removed.register(self.on_store_removed)
|
||||
|
||||
store.reset()
|
||||
self.assertNotIn(tag.name, store.store.keys())
|
||||
self.assertEqual(store.Added.subscribers, set())
|
||||
self.assertEqual(store.Removed.subscribers, set())
|
||||
|
||||
def test_tag_store_state(self):
|
||||
store = tagstore.TagStore()
|
||||
track = fake.Track(1)
|
||||
tag = store.add("test", track)
|
||||
|
||||
state = store.__getstate__()
|
||||
self.assertEqual(set(state.keys()), set([ "store" ]))
|
||||
|
||||
store.__dict__.clear()
|
||||
store.__setstate__(state)
|
||||
self.assertEqual(store.store, { "test" : tag })
|
||||
self.assertIsInstance(store.lock, type(threading.Lock()))
|
||||
self.assertIsInstance(store.Added, publisher.Publisher)
|
||||
self.assertIsInstance(store.Removed, publisher.Publisher)
|
||||
|
||||
self.assertEqual(store.init_track("test", track), tag)
|
||||
self.assertEqual(tag.tracks, [ track ])
|
||||
|
||||
|
||||
class TestTagSuperStore(unittest.TestCase):
|
||||
def test_tag_superstore(self):
|
||||
store = tagstore.TagStore()
|
||||
superstore = tagstore.TagSuperStore()
|
||||
self.assertIsInstance(superstore, tagstore.TagStore)
|
||||
|
||||
parent = store.add("parent", fake.Track(1))
|
||||
supertag = superstore.add(parent, "test ", fake.Track(1))
|
||||
self.assertIsInstance(supertag, tag.SuperTag)
|
||||
self.assertEqual(supertag.name, "test")
|
||||
self.assertEqual(supertag.tracks, [ fake.Track(1) ])
|
||||
self.assertEqual(supertag.parent, parent)
|
||||
|
||||
superstore.remove(supertag, fake.Track(1))
|
||||
self.assertEqual(supertag.tracks, [ ])
|
||||
self.assertNotIn("test", superstore.store.keys())
|
||||
|
||||
def test_tag_superstore_items(self):
|
||||
store = tagstore.TagStore()
|
||||
superstore = tagstore.TagSuperStore()
|
||||
|
||||
parent1 = store.add("parent1", fake.Track(1))
|
||||
parent2 = store.add("parent2", fake.Track(2))
|
||||
tag1 = superstore.add(parent1, "test1", fake.Track(1))
|
||||
tag2 = superstore.add(parent1, "test2", fake.Track(2))
|
||||
tag3 = superstore.add(parent2, "test3", fake.Track(3))
|
||||
|
||||
self.assertEqual(len(superstore), 3)
|
||||
self.assertEqual( superstore[parent1, "test1"], tag1)
|
||||
self.assertEqual( superstore[parent1, "test2"], tag2)
|
||||
self.assertEqual( superstore[parent2, "test3"], tag3)
|
||||
self.assertIsNone(superstore[parent2, "test2"])
|
||||
|
||||
result = [ tag for tag in superstore.tags(parent1) ]
|
||||
self.assertEqual(result, [ tag1, tag2 ])
|
||||
result = [ tag for tag in superstore.tags(parent2) ]
|
||||
self.assertEqual(result, [ tag3 ])
|
||||
result = [ tag for tag in superstore.tags() ]
|
||||
self.assertEqual(result, [ tag1, tag2, tag3 ])
|
||||
|
||||
def test_tag_superstore_init_track(self):
|
||||
store = tagstore.TagStore()
|
||||
superstore = tagstore.TagSuperStore()
|
||||
|
||||
track = fake.Track(1)
|
||||
parent = store.add("test")
|
||||
tag = superstore.add(parent, "test", track)
|
||||
tag.tracks = [ 1 ]
|
||||
|
||||
self.assertEqual(superstore.init_track(parent, "test", track), tag)
|
||||
self.assertEqual(tag.tracks, [ track ])
|
|
@ -1,35 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import publisher
|
||||
from . import thread
|
||||
import threading
|
||||
import unittest
|
||||
|
||||
class TestThread(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.started = None
|
||||
self.called = False
|
||||
self.thread = thread.Thread(self.thread_func)
|
||||
|
||||
def thread_func(self):
|
||||
self.assertIsNotNone(self.thread.thread)
|
||||
self.called = True
|
||||
|
||||
def on_thread_start(self, thread):
|
||||
self.started = thread
|
||||
|
||||
def test_thread(self):
|
||||
self.assertIsInstance(thread.Start, publisher.Publisher)
|
||||
thread.Start.register(self.on_thread_start)
|
||||
|
||||
self.assertIsInstance(self.thread.lock, type(threading.Lock()))
|
||||
self.assertEqual(self.thread.func, self.thread_func)
|
||||
self.assertIsNone(self.thread.thread)
|
||||
self.assertFalse(self.thread.running())
|
||||
|
||||
self.assertEqual(self.thread(), self.thread)
|
||||
self.thread.join()
|
||||
|
||||
self.assertTrue(self.called)
|
||||
self.assertFalse(self.thread.running())
|
||||
self.assertEqual(self.started, self.thread)
|
||||
self.assertIsNone(self.thread.thread)
|
|
@ -5,9 +5,9 @@ import unittest
|
|||
class TestVersion(unittest.TestCase):
|
||||
def test_version(self):
|
||||
self.assertEqual(version.MAJOR, 2)
|
||||
self.assertEqual(version.MINOR, 9)
|
||||
self.assertEqual(version.MINOR, 10)
|
||||
|
||||
self.assertTrue(version.DEBUG)
|
||||
self.assertTrue(__debug__)
|
||||
self.assertTrue(version.TESTING)
|
||||
|
||||
self.assertEqual(version.string(), "Emmental 2.9-debug")
|
||||
self.assertEqual(version.string(), "Emmental 2.10-debug")
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import publisher
|
||||
import threading
|
||||
|
||||
Start = publisher.Publisher()
|
||||
|
||||
class Thread:
|
||||
def __init__(self, func):
|
||||
self.func = func
|
||||
self.thread = None
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def __call__(self):
|
||||
with self.lock:
|
||||
if self.thread:
|
||||
return None
|
||||
self.thread = threading.Thread(target = self.__func__)
|
||||
self.thread.start()
|
||||
Start.publish(self)
|
||||
return self
|
||||
|
||||
def __func__(self):
|
||||
self.func()
|
||||
with self.lock:
|
||||
self.thread = None
|
||||
|
||||
def join(self):
|
||||
if self.thread:
|
||||
self.thread.join()
|
||||
|
||||
def running(self):
|
||||
with self.lock:
|
||||
if self.thread:
|
||||
return self.thread.is_alive()
|
||||
return False
|
|
@ -1,17 +1,10 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
import os
|
||||
import sys
|
||||
|
||||
MAJOR = 2
|
||||
MINOR = 9
|
||||
|
||||
DEBUG = False
|
||||
if os.path.exists(".debug"):
|
||||
with open(".debug") as f:
|
||||
if f.read().strip() == "emmental":
|
||||
DEBUG = True
|
||||
|
||||
TESTING = os.environ.get("EMMENTAL_TESTING") != None
|
||||
|
||||
MINOR = 10
|
||||
TESTING = "unittest" in sys.modules
|
||||
|
||||
def string():
|
||||
return f"Emmental {MAJOR}.{MINOR}{'-debug' if DEBUG else ''}"
|
||||
return f"Emmental {MAJOR}.{MINOR}{'-debug' if __debug__ else ''}"
|
||||
|
|
|
@ -29,27 +29,23 @@ class Panel(Gtk.Box):
|
|||
|
||||
def get_playlist(self): return self.window.get_playlist()
|
||||
def set_playlist(self, plist):
|
||||
self.header.set_playlist(plist)
|
||||
self.window.set_playlist(plist)
|
||||
if plist:
|
||||
self.header.set_playlist(plist)
|
||||
self.window.set_playlist(plist)
|
||||
|
||||
def key_pressed(self, event, keyval, keycode, state):
|
||||
name = Gdk.keyval_name(keyval)
|
||||
if name == "Escape":
|
||||
self.window.clear_selection()
|
||||
return True
|
||||
elif name == "Delete":
|
||||
playlist = self.get_playlist()
|
||||
if playlist and playlist.can_add_remove_tracks():
|
||||
match Gdk.keyval_name(keyval):
|
||||
case "Escape": self.window.clear_selection()
|
||||
case "Delete":
|
||||
playlist = self.get_playlist()
|
||||
if not (playlist and playlist.can_add_remove_tracks()):
|
||||
return False
|
||||
for track in self.selected_tracks():
|
||||
playlist.remove_track(track)
|
||||
return True
|
||||
elif name == "f":
|
||||
self.add_selected_tracks(db.user.Table.find("Favorites"))
|
||||
return True
|
||||
elif name == "q":
|
||||
self.add_selected_tracks(db.user.Table.find("Queued Tracks"))
|
||||
return True
|
||||
return False
|
||||
case "f": self.add_selected_tracks(db.user.Table.find("Favorites"))
|
||||
case "q": self.add_selected_tracks(db.user.Table.find("Queued Tracks"))
|
||||
case _: return False
|
||||
return True
|
||||
|
||||
def jump_clicked(self, button):
|
||||
view = self.window.get_child()
|
||||
|
|
|
@ -33,13 +33,17 @@ class LabelFactory(Gtk.SignalListItemFactory):
|
|||
def get_track_text(self, track):
|
||||
raise NotImplementedError
|
||||
|
||||
def get_track_dim(self, track):
|
||||
return False
|
||||
|
||||
def on_setup(self, factory, listitem):
|
||||
listitem.set_child(TrackLabel(xalign=self.xalign))
|
||||
|
||||
def on_bind(self, factory, listitem):
|
||||
item = listitem.get_item()
|
||||
text = self.get_track_text(item)
|
||||
listitem.get_child().set_item(item, text)
|
||||
if child := listitem.get_child():
|
||||
child.set_item(item, self.get_track_text(item))
|
||||
child.set_sensitive(not self.get_track_dim(item))
|
||||
|
||||
def on_unbind(self, factory, listitem):
|
||||
listitem.get_child().unset_item(listitem.get_item())
|
||||
|
@ -77,7 +81,10 @@ class AlbumFactory(LabelFactory):
|
|||
|
||||
class SubtitleFactory(LabelFactory):
|
||||
def __init__(self): LabelFactory.__init__(self, xalign=0)
|
||||
def get_track_text(self, track): return track.disc.subtitle
|
||||
def get_track_dim(self, track): return len(track.disc.subtitle) == 0
|
||||
def get_track_text(self, track):
|
||||
subtitle = track.disc.subtitle
|
||||
return subtitle if len(subtitle) > 0 else track.disc.name
|
||||
|
||||
|
||||
class YearFactory(LabelFactory):
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
import audio
|
||||
import db
|
||||
import lib
|
||||
from gi.repository import Gtk
|
||||
|
@ -62,6 +63,29 @@ class SortButton(Gtk.MenuButton):
|
|||
self.set_sensitive(plist != db.user.Table.find("Previous"))
|
||||
|
||||
|
||||
class FavoriteButton(Gtk.ToggleButton):
|
||||
def __init__(self):
|
||||
Gtk.ToggleButton.__init__(self)
|
||||
self.set_icon_name("emmental-favorites")
|
||||
self.track_changed(audio.Player, None, audio.Player.track)
|
||||
audio.Player.connect("track-changed", self.track_changed)
|
||||
|
||||
def set_playlist(self, plist):
|
||||
pass
|
||||
|
||||
def track_changed(self, player, old, new):
|
||||
self.set_sensitive(new != None)
|
||||
self.set_active(new in db.user.Table.find("Favorites").get_tracks())
|
||||
|
||||
def do_toggled(self):
|
||||
if audio.Player.track:
|
||||
fav = db.user.Table.find("Favorites")
|
||||
if self.get_active():
|
||||
fav.add_track(audio.Player.track)
|
||||
else:
|
||||
fav.remove_track(audio.Player.track)
|
||||
|
||||
|
||||
class JumpButton(Gtk.Button):
|
||||
def __init__(self):
|
||||
Gtk.Button.__init__(self)
|
||||
|
@ -74,20 +98,9 @@ class JumpButton(Gtk.Button):
|
|||
|
||||
class ControlBox(Gtk.Box):
|
||||
def __init__(self):
|
||||
Gtk.Box.__init__(self)
|
||||
Gtk.Box.__init__(self, margin_top=5, margin_bottom=5,
|
||||
margin_end=5, margin_start=5)
|
||||
self.add_css_class("linked")
|
||||
self.append(RandomToggle())
|
||||
self.append(LoopToggle())
|
||||
self.append(SortButton())
|
||||
self.append(JumpButton())
|
||||
|
||||
self.set_margin_top(5)
|
||||
self.set_margin_bottom(5)
|
||||
self.set_margin_start(5)
|
||||
self.set_margin_end(5)
|
||||
|
||||
def get_jump_button(self):
|
||||
return self.get_last_child()
|
||||
|
||||
def set_playlist(self, plist):
|
||||
child = self.get_first_child()
|
||||
|
@ -96,14 +109,34 @@ class ControlBox(Gtk.Box):
|
|||
child = child.get_next_sibling()
|
||||
|
||||
|
||||
class TrackBox(ControlBox):
|
||||
def __init__(self):
|
||||
ControlBox.__init__(self)
|
||||
self.append(FavoriteButton())
|
||||
self.append(JumpButton())
|
||||
|
||||
def get_jump_button(self):
|
||||
return self.get_last_child()
|
||||
|
||||
|
||||
class PlaylistBox(ControlBox):
|
||||
def __init__(self):
|
||||
ControlBox.__init__(self)
|
||||
self.append(RandomToggle())
|
||||
self.append(LoopToggle())
|
||||
self.append(SortButton())
|
||||
|
||||
|
||||
class Header(Gtk.Box):
|
||||
def __init__(self):
|
||||
Gtk.Box.__init__(self)
|
||||
self.append(TrackBox())
|
||||
self.append(FilterEntry())
|
||||
self.append(ControlBox())
|
||||
self.append(PlaylistBox())
|
||||
|
||||
def get_jump_button(self):
|
||||
return self.get_last_child().get_jump_button()
|
||||
return self.get_first_child().get_jump_button()
|
||||
|
||||
def set_playlist(self, plist):
|
||||
self.get_first_child().set_playlist(plist)
|
||||
self.get_last_child().set_playlist(plist)
|
||||
|
|
|
@ -100,9 +100,11 @@ class TestColumnFactories(unittest.TestCase):
|
|||
def test_subtitle(self):
|
||||
factory = column.SubtitleFactory()
|
||||
self.assertIsInstance(factory, column.LabelFactory)
|
||||
self.assertEqual(factory.get_track_text(self.track), "")
|
||||
self.assertEqual(factory.get_track_text(self.track), "Disc 1")
|
||||
self.assertTrue(factory.get_track_dim(self.track))
|
||||
self.track.disc._subtitle = "Test Subtitle"
|
||||
self.assertEqual(factory.get_track_text(self.track), "Test Subtitle")
|
||||
self.assertFalse(factory.get_track_dim(self.track))
|
||||
|
||||
def test_year(self):
|
||||
factory = column.YearFactory()
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
import audio
|
||||
import db
|
||||
import lib
|
||||
import unittest
|
||||
|
@ -113,6 +114,52 @@ class TestSortButton(unittest.TestCase):
|
|||
self.assertTrue(sort.get_sensitive())
|
||||
|
||||
|
||||
class TestFavoriteButton(unittest.TestCase):
|
||||
def setUp(self):
|
||||
db.reset()
|
||||
audio.Player.track = None
|
||||
|
||||
def test_init(self):
|
||||
fav = header.FavoriteButton()
|
||||
self.assertIsInstance(fav, Gtk.ToggleButton)
|
||||
self.assertEqual(fav.get_icon_name(), "emmental-favorites")
|
||||
self.assertFalse(fav.get_sensitive())
|
||||
|
||||
def test_sensitive(self):
|
||||
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
|
||||
fav = header.FavoriteButton()
|
||||
fav.track_changed(audio.Player, None, track)
|
||||
self.assertTrue(fav.get_sensitive())
|
||||
fav.track_changed(audio.Player, None, None)
|
||||
self.assertFalse(fav.get_sensitive())
|
||||
|
||||
def test_active(self):
|
||||
fav = header.FavoriteButton()
|
||||
t1 = db.make_fake_track(1, 1, "Test Track 1", "/a/b/c/1.ogg")
|
||||
t2 = db.make_fake_track(2, 2, "Test Track 2", "/a/b/c/2.ogg")
|
||||
db.user.Table.find("Favorites").add_track(t1)
|
||||
|
||||
fav.track_changed(audio.Player, None, t1)
|
||||
self.assertTrue(fav.get_active())
|
||||
fav.track_changed(audio.Player, t1, t2)
|
||||
self.assertFalse(fav.get_active())
|
||||
|
||||
def test_toggle(self):
|
||||
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
|
||||
fav = header.FavoriteButton()
|
||||
|
||||
fav.set_active(True)
|
||||
self.assertEqual(db.user.Table.find("Favorites").get_n_tracks(), 0)
|
||||
fav.set_active(False)
|
||||
|
||||
audio.Player.track = track
|
||||
self.assertNotIn(track, db.user.Table.find("Favorites").get_tracks())
|
||||
fav.set_active(True)
|
||||
self.assertIn(track, db.user.Table.find("Favorites").get_tracks())
|
||||
fav.set_active(False)
|
||||
self.assertNotIn(track, db.user.Table.find("Favorites").get_tracks())
|
||||
|
||||
|
||||
class TestJumpButton(unittest.TestCase):
|
||||
def test_init(self):
|
||||
jump = header.JumpButton()
|
||||
|
@ -138,9 +185,31 @@ class TestControlBox(unittest.TestCase):
|
|||
self.assertEqual(box.get_margin_end(), 5)
|
||||
self.assertTrue(box.has_css_class("linked"))
|
||||
|
||||
|
||||
class TestTrackBox(unittest.TestCase):
|
||||
def test_init(self):
|
||||
box = header.TrackBox()
|
||||
self.assertIsInstance(box, header.ControlBox)
|
||||
|
||||
def test_children(self):
|
||||
box = header.TrackBox()
|
||||
|
||||
child = box.get_first_child()
|
||||
self.assertIsInstance(child, header.FavoriteButton)
|
||||
|
||||
child = child.get_next_sibling()
|
||||
self.assertIsInstance(child, header.JumpButton)
|
||||
self.assertEqual(box.get_jump_button(), child)
|
||||
|
||||
|
||||
class TestPlaylistBox(unittest.TestCase):
|
||||
def test_init(self):
|
||||
box = header.PlaylistBox()
|
||||
self.assertIsInstance(box, header.ControlBox)
|
||||
|
||||
def test_children(self):
|
||||
collection = db.user.Table.find("Collection")
|
||||
box = header.ControlBox()
|
||||
box = header.PlaylistBox()
|
||||
box.set_playlist(collection)
|
||||
|
||||
child = box.get_first_child()
|
||||
|
@ -154,10 +223,6 @@ class TestControlBox(unittest.TestCase):
|
|||
child = child.get_next_sibling()
|
||||
self.assertIsInstance(child, header.SortButton)
|
||||
|
||||
child = child.get_next_sibling()
|
||||
self.assertIsInstance(child, header.JumpButton)
|
||||
self.assertEqual(box.get_jump_button(), child)
|
||||
|
||||
|
||||
class TestHeader(unittest.TestCase):
|
||||
def test_init(self):
|
||||
|
@ -172,6 +237,9 @@ class TestHeader(unittest.TestCase):
|
|||
box.set_playlist(collection)
|
||||
|
||||
child = box.get_first_child()
|
||||
self.assertIsInstance(child, header.TrackBox)
|
||||
|
||||
child = child.get_next_sibling()
|
||||
self.assertIsInstance(child, header.FilterEntry)
|
||||
|
||||
child = child.get_next_sibling()
|
||||
|
|
|
@ -29,11 +29,9 @@ def EnableSwitch(library):
|
|||
def commit():
|
||||
Queue.push(task.CommitTask())
|
||||
|
||||
def import_track(lib, track, playcount, lastplayed, playlists):
|
||||
Queue.push(task.ImportTask(lib, track, playcount, lastplayed, playlists))
|
||||
|
||||
def update_library(lib):
|
||||
Queue.push(task.CheckSchedulerTask(lib))
|
||||
Queue.push(task.DirectoryTask(lib, lib.path))
|
||||
|
||||
def remove_library(lib):
|
||||
Queue.push(task.RemoveLibrarySchedulerTask(lib))
|
||||
|
|
|
@ -17,6 +17,9 @@ class TaskQueue(GObject.GObject):
|
|||
def push(self, task):
|
||||
self.emit("task-pushed", task)
|
||||
|
||||
def clear(self):
|
||||
self.emit("tasks-finished")
|
||||
|
||||
def run(self):
|
||||
self.emit("run-task", self.tasks.pop(0))
|
||||
if len(self.tasks) > 0:
|
||||
|
|
|
@ -38,32 +38,6 @@ class FileTask(Task):
|
|||
db.genre.Table.find(genre).add_track(track)
|
||||
|
||||
|
||||
class ImportTask(FileTask):
|
||||
def __init__(self, library, filepath, playcount, lastplayed, playlists):
|
||||
FileTask.__init__(self, library, filepath)
|
||||
self.playcount = playcount
|
||||
if isinstance(lastplayed, datetime.datetime):
|
||||
self.lastplayed = lastplayed
|
||||
elif isinstance(lastplayed, datetime.date):
|
||||
self.lastplayed = datetime.datetime.combine(lastplayed,
|
||||
datetime.time())
|
||||
|
||||
exclude = set([ "Collection", "New Tracks", "Previous" ])
|
||||
self.playlists = [ p for p in playlists if p not in exclude ]
|
||||
if "Up Next" in self.playlists:
|
||||
self.playlists[self.playlists.index("Up Next")] = "Queued Tracks"
|
||||
|
||||
def run_task(self):
|
||||
FileTask.run_task(self)
|
||||
if track := db.track.Table.lookup(self.filepath):
|
||||
db.user.Table.find("New Tracks").remove_track(track)
|
||||
for plist in self.playlists:
|
||||
db.user.Table.find(plist).add_track(track)
|
||||
if self.playcount > 0:
|
||||
track.playcount = self.playcount
|
||||
track.lastplayed = self.lastplayed
|
||||
|
||||
|
||||
class DirectoryTask(Task):
|
||||
def __init__(self, library, dirpath):
|
||||
Task.__init__(self)
|
||||
|
|
|
@ -10,12 +10,12 @@ track_02 = test_tracks / "02 - Test {Disc 2}.ogg"
|
|||
text_txt = test_tracks / "text.txt"
|
||||
|
||||
class TestMetadata(unittest.TestCase):
|
||||
def test_metadata_init(self):
|
||||
def test_init(self):
|
||||
mdf = scanner.metadata.Metadata(track_01)
|
||||
self.assertEqual(mdf.path, track_01)
|
||||
self.assertIsNone(mdf.file)
|
||||
|
||||
def test_metadata_track_01(self):
|
||||
def test_track_01(self):
|
||||
with scanner.metadata.Metadata(track_01) as mdf:
|
||||
self.assertEqual(mdf.album(), "Test Album")
|
||||
self.assertEqual(mdf.artist(), "Test Artist")
|
||||
|
@ -30,7 +30,7 @@ class TestMetadata(unittest.TestCase):
|
|||
self.assertEqual(mdf.tracknumber(), 1)
|
||||
self.assertEqual(mdf.year(), 2019)
|
||||
|
||||
def test_metadata_track_02(self):
|
||||
def test_track_02(self):
|
||||
with scanner.metadata.Metadata(track_02) as mdf:
|
||||
self.assertEqual(mdf.artist(), "Test Album Artist")
|
||||
self.assertEqual(mdf.artistsort(), "Album Artist, Test")
|
||||
|
@ -39,6 +39,6 @@ class TestMetadata(unittest.TestCase):
|
|||
self.assertEqual(mdf.release(), datetime.date(2019, 1, 1))
|
||||
self.assertEqual(mdf.year(), 2019)
|
||||
|
||||
def test_metadata_text_txt(self):
|
||||
def test_text_txt(self):
|
||||
with scanner.metadata.Metadata(text_txt) as mdf:
|
||||
mdf.artist()
|
||||
|
|
|
@ -10,7 +10,7 @@ class FakeTask(task.Task):
|
|||
self.res = res
|
||||
def run_task(self): return self.res
|
||||
|
||||
class TestScannerTaskQueue(unittest.TestCase):
|
||||
class TestTaskQueue(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.idle_start = None
|
||||
|
||||
|
@ -23,7 +23,7 @@ class TestScannerTaskQueue(unittest.TestCase):
|
|||
def on_tasks_finished(self, queue):
|
||||
self.tasks_finished = True
|
||||
|
||||
def test_scanner_queue_init(self):
|
||||
def test_init(self):
|
||||
q = queue.TaskQueue()
|
||||
|
||||
self.assertIsInstance(q, GObject.GObject)
|
||||
|
@ -32,7 +32,7 @@ class TestScannerTaskQueue(unittest.TestCase):
|
|||
self.assertEqual(q.idleid, None)
|
||||
self.assertEqual(q.progress, 0)
|
||||
|
||||
def test_scanner_queue_push(self):
|
||||
def test_push(self):
|
||||
q = queue.TaskQueue()
|
||||
fake = FakeTask()
|
||||
q.connect("task-pushed", self.on_task_pushed)
|
||||
|
@ -42,7 +42,17 @@ class TestScannerTaskQueue(unittest.TestCase):
|
|||
self.assertIsNotNone(q.idleid)
|
||||
self.assertEqual(self.pushed_task, fake)
|
||||
|
||||
def test_scanner_queue_run(self):
|
||||
def test_clear(self):
|
||||
q = queue.TaskQueue()
|
||||
q.connect("tasks-finished", self.on_tasks_finished)
|
||||
|
||||
q.push(FakeTask())
|
||||
q.clear()
|
||||
self.assertEqual(q.tasks, [ ])
|
||||
self.assertTrue(self.tasks_finished)
|
||||
self.assertIsNone(q.idleid)
|
||||
|
||||
def test_run(self):
|
||||
q = queue.TaskQueue()
|
||||
fake3 = FakeTask()
|
||||
fake2 = FakeTask()
|
||||
|
|
|
@ -12,31 +12,28 @@ class TestScanner(unittest.TestCase):
|
|||
def tearDown(self):
|
||||
scanner.Queue.tasks_finished()
|
||||
|
||||
def test_scanner_init(self):
|
||||
def test_init(self):
|
||||
self.assertIsInstance(scanner.Queue, scanner.queue.TaskQueue)
|
||||
|
||||
def test_scanner_import_track(self):
|
||||
lib = db.library.Table.find(test_album)
|
||||
scanner.import_track(lib, test_track, 2, datetime.date.today(), [ ])
|
||||
self.assertIsInstance(scanner.Queue.tasks[0], scanner.task.ImportTask)
|
||||
|
||||
def test_scanner_update_library(self):
|
||||
def test_update_library(self):
|
||||
lib = db.library.Table.find(test_album)
|
||||
scanner.update_library(lib)
|
||||
self.assertIsInstance(scanner.Queue.tasks[0],
|
||||
scanner.task.CheckSchedulerTask)
|
||||
self.assertIsInstance(scanner.Queue.tasks[1],
|
||||
scanner.task.DirectoryTask)
|
||||
|
||||
def test_scanner_remove_library(self):
|
||||
def test_remove_library(self):
|
||||
lib = db.library.Table.find(test_album)
|
||||
scanner.remove_library(lib)
|
||||
self.assertIsInstance(scanner.Queue.tasks[0],
|
||||
scanner.task.RemoveLibrarySchedulerTask)
|
||||
|
||||
def test_scanner_commit(self):
|
||||
def test_commit(self):
|
||||
scanner.commit()
|
||||
self.assertIsInstance(scanner.Queue.tasks[0], scanner.task.CommitTask)
|
||||
|
||||
def test_scanner_widgets(self):
|
||||
def test_widgets(self):
|
||||
lib = db.library.Table.find(test_album)
|
||||
|
||||
self.assertIsInstance(scanner.ProgressBar(),
|
||||
|
|
|
@ -10,7 +10,7 @@ test_tracks = pathlib.Path("./data/Test Album")
|
|||
test_track01 = test_tracks / "01 - Test Track.ogg"
|
||||
|
||||
class TestScannerTask(unittest.TestCase):
|
||||
def test_scanner_task_init(self):
|
||||
def test_task(self):
|
||||
t = task.Task()
|
||||
self.assertIsInstance(t, GObject.GObject)
|
||||
with self.assertRaises(NotImplementedError):
|
||||
|
@ -18,7 +18,7 @@ class TestScannerTask(unittest.TestCase):
|
|||
|
||||
|
||||
class TestScannerCommitTask(unittest.TestCase):
|
||||
def test_scanner_commit_task(self):
|
||||
def test_task(self):
|
||||
ct = task.CommitTask()
|
||||
self.assertIsInstance(ct, task.Task)
|
||||
self.assertEqual(ct.run_task(), None)
|
||||
|
@ -28,7 +28,7 @@ class TestScannerFileTask(unittest.TestCase):
|
|||
def setUp(self):
|
||||
db.reset()
|
||||
|
||||
def test_scanner_file_task(self):
|
||||
def test_task(self):
|
||||
lib = db.library.Table.find(test_tracks)
|
||||
ft = task.FileTask(lib, test_track01)
|
||||
|
||||
|
@ -65,42 +65,11 @@ class TestScannerFileTask(unittest.TestCase):
|
|||
self.assertEqual(new.get_track(0), track)
|
||||
|
||||
|
||||
class TestScannerImportTask(unittest.TestCase):
|
||||
def setUp(self):
|
||||
db.reset()
|
||||
|
||||
def test_scanner_import_task(self):
|
||||
playlists = [ "Collection", "Favorites", "New Tracks",
|
||||
"Previous", "Up Next", "Test Playlist" ]
|
||||
today = datetime.date.today()
|
||||
lib = db.library.Table.find(test_tracks)
|
||||
it = task.ImportTask(lib, test_track01, 4, today, playlists)
|
||||
|
||||
self.assertIsInstance(it, task.FileTask)
|
||||
self.assertEqual(it.library, lib)
|
||||
self.assertEqual(it.filepath, test_track01)
|
||||
self.assertEqual(it.playcount, 4)
|
||||
self.assertEqual(it.lastplayed,
|
||||
datetime.datetime.combine(today, datetime.time()))
|
||||
self.assertEqual(it.playlists, ["Favorites", "Queued Tracks", "Test Playlist" ])
|
||||
|
||||
self.assertIsNone(it.run_task())
|
||||
|
||||
track = db.track.Table.lookup(test_track01)
|
||||
self.assertEqual(track.playcount, 4)
|
||||
self.assertEqual(track.lastplayed,
|
||||
datetime.datetime.combine(today, datetime.time()))
|
||||
self.assertEqual(db.user.Table.lookup("Favorites").get_track(0), track)
|
||||
self.assertEqual(db.user.Table.lookup("Queued Tracks").get_track(0), track)
|
||||
self.assertEqual(db.user.Table.lookup("Test Playlist").get_track(0), track)
|
||||
self.assertEqual(db.user.Table.lookup("New Tracks").get_n_tracks(), 0)
|
||||
|
||||
|
||||
class TestScannerDirectoryTask(unittest.TestCase):
|
||||
def setUp(self):
|
||||
db.reset()
|
||||
|
||||
def test_scanner_directory_task(self):
|
||||
def test_task(self):
|
||||
lib = db.library.Table.find(test_tracks)
|
||||
dt = task.DirectoryTask(lib, test_tracks)
|
||||
|
||||
|
@ -125,7 +94,7 @@ class TestScannerCheckTask(unittest.TestCase):
|
|||
def setUp(self):
|
||||
db.reset()
|
||||
|
||||
def test_scanner_check_task(self):
|
||||
def test_task(self):
|
||||
lib = db.library.Table.find(test_tracks)
|
||||
for i in [ 1, 2, 3 ]:
|
||||
db.make_fake_track(i, i, f"Test Track {i}", f"{lib.path}/{i}.ogg", lib.path)
|
||||
|
@ -148,7 +117,7 @@ class TestScannerCheckSchedulerTask(unittest.TestCase):
|
|||
def setUp(self):
|
||||
db.reset()
|
||||
|
||||
def test_scanner_check_scheduler_task(self):
|
||||
def test_task(self):
|
||||
lib = db.library.Table.find(test_tracks)
|
||||
track = db.make_fake_track(1, 1, "Test Album Track", str(test_track01), str(test_tracks))
|
||||
|
||||
|
@ -178,7 +147,7 @@ class TestScannerRemoveTask(unittest.TestCase):
|
|||
def setUp(self):
|
||||
db.reset()
|
||||
|
||||
def test_scanner_remove_task(self):
|
||||
def test_task(self):
|
||||
lib = db.library.Table.find(test_tracks)
|
||||
for i in [ 1, 2, 3, 4, 5 ]:
|
||||
db.make_fake_track(i, i, f"Test Track {i}", f"{lib.path}/{i}.ogg", lib.path)
|
||||
|
@ -197,7 +166,7 @@ class TestScannerRemoveLibraryTask(unittest.TestCase):
|
|||
def setUp(self):
|
||||
db.reset()
|
||||
|
||||
def test_scanner_remove_library_task(self):
|
||||
def test_task(self):
|
||||
lib = db.library.Table.find(test_tracks)
|
||||
rlt = task.RemoveLibraryTask(lib)
|
||||
|
||||
|
@ -212,7 +181,7 @@ class TestScannerRemoveLibrarySchedulerTask(unittest.TestCase):
|
|||
def setUp(self):
|
||||
db.reset()
|
||||
|
||||
def test_scanner_check_scheduler_task(self):
|
||||
def test_task(self):
|
||||
lib = db.library.Table.find(test_tracks)
|
||||
for i in range(75):
|
||||
db.make_fake_track(i, i, f"Test Track {i}",f"/a/b/c/{i}.ogg", lib.path)
|
||||
|
|
|
@ -18,7 +18,7 @@ class FakeTask(task.Task):
|
|||
def run_task(self): return None
|
||||
|
||||
class TestScannerProgressBar(unittest.TestCase):
|
||||
def test_progress_bar(self):
|
||||
def test_init(self):
|
||||
q = queue.TaskQueue()
|
||||
pb = widgets.ProgressBar(q)
|
||||
|
||||
|
@ -39,7 +39,7 @@ class TestScannerProgressBar(unittest.TestCase):
|
|||
|
||||
|
||||
class DirectoryChooserWidget(unittest.TestCase):
|
||||
def test_directory_chooser_widget(self):
|
||||
def test_init(self):
|
||||
dcw = widgets.DirectoryChooserWidget()
|
||||
|
||||
self.assertIsInstance(dcw, Gtk.FileChooserWidget)
|
||||
|
@ -61,7 +61,7 @@ class DirectoryChooserWidget(unittest.TestCase):
|
|||
|
||||
|
||||
class DirectoryChooserPopover(unittest.TestCase):
|
||||
def test_directory_chooser_popover(self):
|
||||
def test_init(self):
|
||||
db.reset()
|
||||
q = queue.TaskQueue()
|
||||
dcp = widgets.DirectoryChooserPopover(q)
|
||||
|
@ -97,7 +97,7 @@ class DirectoryChooserPopover(unittest.TestCase):
|
|||
|
||||
|
||||
class TestScannerAddFolderButton(unittest.TestCase):
|
||||
def test_add_folder_button(self):
|
||||
def test_init(self):
|
||||
q = queue.TaskQueue()
|
||||
afb = widgets.AddFolderButton(q)
|
||||
|
||||
|
@ -110,7 +110,7 @@ class TestScannerAddFolderButton(unittest.TestCase):
|
|||
|
||||
|
||||
class TestScannerUpdateButton(unittest.TestCase):
|
||||
def test_update_button(self):
|
||||
def test_init(self):
|
||||
lib = db.library.Table.find("/a/b/c")
|
||||
q = queue.TaskQueue()
|
||||
ub = widgets.UpdateButton(lib, q)
|
||||
|
@ -130,7 +130,7 @@ class TestScannerUpdateButton(unittest.TestCase):
|
|||
|
||||
|
||||
class TestScannerUpdateAllButton(unittest.TestCase):
|
||||
def test_update_all_button(self):
|
||||
def test_init(self):
|
||||
db.reset()
|
||||
lib1 = db.library.Table.find("/a/b/c")
|
||||
lib2 = db.library.Table.find("/d/e/f")
|
||||
|
@ -153,7 +153,7 @@ class TestScannerUpdateAllButton(unittest.TestCase):
|
|||
|
||||
|
||||
class TestScannerRemoveButton(unittest.TestCase):
|
||||
def test_remove_button(self):
|
||||
def test_init(self):
|
||||
lib = db.library.Table.find("/a/b/c")
|
||||
q = queue.TaskQueue()
|
||||
rb = widgets.RemoveButton(lib, q)
|
||||
|
|
|
@ -1,111 +0,0 @@
|
|||
# Copyright 2020 (c) Anna Schumaker.
|
||||
from . import allocator
|
||||
from . import stack
|
||||
from . import tags
|
||||
from . import track
|
||||
import db
|
||||
import lib
|
||||
import pathlib
|
||||
import scanner
|
||||
import threading
|
||||
|
||||
File = "tagdb.pickle"
|
||||
Bus = lib.bus.Bus(500)
|
||||
Tracks = allocator.TrackAllocator()
|
||||
Stack = stack.TagStack()
|
||||
|
||||
class LibraryTag(lib.tag.Tag):
|
||||
def __init__(self, path):
|
||||
super().__init__(path)
|
||||
self.clear = lib.thread.Thread(self.__do_clear__)
|
||||
self.scan = lib.thread.Thread(self.__do_scan__)
|
||||
|
||||
def __setstate__(self, state):
|
||||
super().__setstate__(state)
|
||||
self.clear = lib.thread.Thread(self.__do_clear__)
|
||||
self.scan = lib.thread.Thread(self.__do_scan__)
|
||||
|
||||
def __do_scan__(self):
|
||||
for trak in Tracks.autoremove(self):
|
||||
self.remove_track(trak)
|
||||
|
||||
track_set = set([ t.filepath() for t in Tracks.list_tracks(self) ])
|
||||
for f in self.name.rglob("*"):
|
||||
if f not in track_set and f.is_file():
|
||||
if (track := Tracks.allocate(self, f)) != None:
|
||||
self.add_track(track)
|
||||
|
||||
def __do_clear__(self):
|
||||
for trak in self.tracks:
|
||||
Tracks.remove(trak)
|
||||
self.tracks.clear()
|
||||
|
||||
def fix_tracks(self):
|
||||
for (index, trak) in enumerate(self.tracks):
|
||||
t = Tracks[trak]
|
||||
if t is not None:
|
||||
self.tracks[index] = t
|
||||
|
||||
|
||||
|
||||
class LibraryStore(lib.tagstore.TagStore):
|
||||
def __alloc_tag__(self, name, sort):
|
||||
return LibraryTag(name)
|
||||
|
||||
def add(self, name):
|
||||
return super().__add_tag__(name, None, None)
|
||||
|
||||
def remove(self, lib):
|
||||
lib.clear()
|
||||
super().remove(lib)
|
||||
|
||||
def fix_tracks(self):
|
||||
for (id, tag) in self.store.items():
|
||||
tag.fix_tracks()
|
||||
|
||||
Library = LibraryStore()
|
||||
|
||||
|
||||
def _do_save():
|
||||
try:
|
||||
with lib.data.DataFile(File, lib.data.WRITE) as f:
|
||||
f.pickle([ tags.get_state(), Tracks, Library, Stack ])
|
||||
except Exception as e:
|
||||
return lib.bus.RETRY
|
||||
|
||||
def save(*args):
|
||||
Bus.board(_do_save)
|
||||
|
||||
def load():
|
||||
global Library
|
||||
global Tracks
|
||||
global Stack
|
||||
if not db.new_db():
|
||||
return
|
||||
with lib.data.DataFile(File, lib.data.READ) as f:
|
||||
if f.exists():
|
||||
(tagstate, Tracks, Library, Stack) = f.unpickle()
|
||||
tags.set_state(*tagstate)
|
||||
Tracks.load_tags()
|
||||
Library.fix_tracks()
|
||||
__register_callbacks()
|
||||
scanner.Queue.push(scanner.task.CommitTask())
|
||||
|
||||
def __register_callbacks():
|
||||
for store in [ Library, tags.User, Tracks ]:
|
||||
store.Added.register(save)
|
||||
store.Removed.register(save)
|
||||
Tracks.Updated.register(save)
|
||||
Stack.PushPop.register(save)
|
||||
Stack.NextTrack.register(save)
|
||||
__register_callbacks()
|
||||
|
||||
def reset():
|
||||
Tracks.reset()
|
||||
Library.reset()
|
||||
Stack.reset()
|
||||
tags.reset()
|
||||
Bus.clear()
|
||||
|
||||
lib.data.DataFile(File, lib.data.READ).remove()
|
||||
__register_callbacks()
|
|
@ -1,85 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from lib import publisher
|
||||
from . import track
|
||||
import threading
|
||||
|
||||
class TrackAllocator:
|
||||
def __init__(self):
|
||||
self.tracks = dict()
|
||||
self.nextid = 0
|
||||
self.lock = threading.Lock()
|
||||
self.Added = publisher.Publisher()
|
||||
self.Removed = publisher.Publisher()
|
||||
self.Updated = publisher.Publisher()
|
||||
|
||||
def __alloc_track__(self, lib, filepath):
|
||||
with self.lock:
|
||||
trak = track.Track(self.nextid, filepath, lib)
|
||||
self.tracks[self.nextid] = trak
|
||||
self.nextid += 1
|
||||
self.Added.publish(trak)
|
||||
return trak
|
||||
|
||||
def __getitem__(self, id):
|
||||
with self.lock:
|
||||
return self.tracks.get(id, None)
|
||||
|
||||
def __getstate__(self):
|
||||
with self.lock:
|
||||
return { "tracks" : self.tracks,
|
||||
"nextid" : self.nextid }
|
||||
|
||||
def __len__(self):
|
||||
with self.lock:
|
||||
return len(self.tracks)
|
||||
|
||||
def __setstate__(self, state):
|
||||
self.__dict__.update(state)
|
||||
self.lock = threading.Lock()
|
||||
self.Added = publisher.Publisher()
|
||||
self.Removed = publisher.Publisher()
|
||||
self.Updated = publisher.Publisher()
|
||||
|
||||
def allocate(self, lib, filepath):
|
||||
try:
|
||||
return self.__alloc_track__(lib, filepath)
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
def autoremove(self, lib):
|
||||
with self.lock:
|
||||
to_rm = [ trak for trak in self.tracks.values() \
|
||||
if trak.library == lib and not trak.filepath().exists() ]
|
||||
for trak in to_rm:
|
||||
trak.about_to_remove()
|
||||
del self.tracks[trak.trackid]
|
||||
self.Removed.publish(trak)
|
||||
return to_rm
|
||||
|
||||
def list_tracks(self, lib):
|
||||
with self.lock:
|
||||
for (id, track) in self.tracks.items():
|
||||
if track.library ==lib:
|
||||
yield track
|
||||
|
||||
def load_tags(self):
|
||||
for (id, track) in self.tracks.items():
|
||||
track.__set_tags__()
|
||||
|
||||
def played(self, track):
|
||||
with self.lock:
|
||||
track.played()
|
||||
self.Updated.publish(track)
|
||||
|
||||
def remove(self, track):
|
||||
with self.lock:
|
||||
track.about_to_remove()
|
||||
del self.tracks[track.trackid]
|
||||
self.Removed.publish(track)
|
||||
|
||||
def reset(self):
|
||||
with self.lock:
|
||||
self.nextid = 0
|
||||
self.tracks.clear()
|
||||
self.Added.reset()
|
||||
self.Removed.reset()
|
|
@ -1,75 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from lib import counter
|
||||
from lib import publisher
|
||||
from . import tags
|
||||
|
||||
class TagStack:
|
||||
def __init__(self):
|
||||
self.tags = [ ]
|
||||
self.Counter = counter.Counter(-1, 99)
|
||||
self.PushPop = publisher.Publisher()
|
||||
self.NextTrack = publisher.Publisher()
|
||||
|
||||
def __do_next__(self, tag):
|
||||
if track := tag.next():
|
||||
track.add_to_playlist("Previous")
|
||||
return track
|
||||
|
||||
def __getstate__(self):
|
||||
return { "tags" : self.tags }
|
||||
|
||||
def __setstate__(self, state):
|
||||
self.__dict__.update(state)
|
||||
self.Counter = counter.Counter(-1, 99)
|
||||
self.PushPop = publisher.Publisher()
|
||||
self.NextTrack = publisher.Publisher()
|
||||
|
||||
def current(self):
|
||||
if len(self.tags) == 0:
|
||||
return tags.User.store["Collection"]
|
||||
return self.tags[0]
|
||||
|
||||
def __next_track__(self):
|
||||
if len(self.tags) == 0:
|
||||
return self.__do_next__(tags.User["Collection"])
|
||||
if track := self.__do_next__(self.tags[0]):
|
||||
return track
|
||||
self.pop()
|
||||
return self.__next_track__()
|
||||
|
||||
def next(self):
|
||||
ret = self.__next_track__()
|
||||
count = self.Counter.decrement()
|
||||
self.NextTrack.publish(ret)
|
||||
return (ret, count != -1)
|
||||
|
||||
def pop(self):
|
||||
prev = self.tags.pop(0)
|
||||
self.PushPop.publish(prev, self.current())
|
||||
|
||||
def previous(self):
|
||||
return tags.User["Previous"].next()
|
||||
|
||||
def push(self, tag):
|
||||
prev = self.current()
|
||||
if tag == tags.User["Previous"]:
|
||||
return
|
||||
if tag == tags.User["Collection"]:
|
||||
self.tags.clear()
|
||||
self.PushPop.publish(prev, tags.User["Collection"])
|
||||
return
|
||||
if tag in self.tags:
|
||||
self.tags.remove(tag)
|
||||
|
||||
self.tags.insert(0, tag)
|
||||
tag.stacked()
|
||||
self.PushPop.publish(prev, tag)
|
||||
|
||||
def queue(self, track):
|
||||
track.add_to_playlist("Up Next")
|
||||
self.push(tags.User["Up Next"])
|
||||
|
||||
def reset(self):
|
||||
self.tags.clear()
|
||||
self.count = None
|
||||
self.PushPop.reset()
|
|
@ -1,36 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import user
|
||||
from lib import tagstore
|
||||
|
||||
Artist = tagstore.TagStore()
|
||||
Album = tagstore.TagSuperStore()
|
||||
Genre = tagstore.TagStore()
|
||||
Decade = tagstore.TagStore()
|
||||
Year = tagstore.TagSuperStore()
|
||||
User = user.UserTagStore()
|
||||
|
||||
def get_state():
|
||||
return (Artist, Album, Genre, Decade, Year, User)
|
||||
|
||||
def set_state(artist, album, genre, decade, year, user):
|
||||
global Artist
|
||||
global Album
|
||||
global Genre
|
||||
global Decade
|
||||
global Year
|
||||
global User
|
||||
|
||||
Artist = artist
|
||||
Album = album
|
||||
Genre = genre
|
||||
Decade = decade
|
||||
Year = year
|
||||
User = user
|
||||
|
||||
def reset():
|
||||
Artist.reset()
|
||||
Album.reset()
|
||||
Genre.reset()
|
||||
Decade.reset()
|
||||
Year.reset()
|
||||
User.reset()
|
|
@ -1,110 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from lib import publisher
|
||||
from . import allocator
|
||||
import pathlib
|
||||
import threading
|
||||
import unittest
|
||||
|
||||
test_tracks = pathlib.Path("./data/Test Album")
|
||||
|
||||
class FakeLibrary:
|
||||
def __init__(self):
|
||||
self.name = test_tracks
|
||||
|
||||
class TestTrackAllocator(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.lib = FakeLibrary()
|
||||
self.added = None
|
||||
self.removed = None
|
||||
self.updated = None
|
||||
|
||||
def on_track_added(self, track):
|
||||
self.added = track
|
||||
|
||||
def on_track_removed(self, track):
|
||||
self.removed = track
|
||||
|
||||
def on_track_updated(self, track):
|
||||
self.updated = track
|
||||
|
||||
def test_allocator_init(self):
|
||||
alloc = allocator.TrackAllocator()
|
||||
self.assertEqual(alloc.tracks, { })
|
||||
self.assertEqual(alloc.nextid, 0)
|
||||
self.assertIsInstance(alloc.lock, type(threading.Lock()))
|
||||
self.assertIsInstance(alloc.Added, publisher.Publisher)
|
||||
self.assertIsInstance(alloc.Removed, publisher.Publisher)
|
||||
self.assertIsInstance(alloc.Updated, publisher.Publisher)
|
||||
|
||||
def test_allocator(self):
|
||||
alloc = allocator.TrackAllocator()
|
||||
alloc.Added.register(self.on_track_added)
|
||||
alloc.Removed.register(self.on_track_removed)
|
||||
|
||||
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
|
||||
self.assertEqual(alloc.tracks[0], track)
|
||||
self.assertEqual(alloc[0], track)
|
||||
self.assertEqual(len(alloc), 1)
|
||||
self.assertEqual(alloc.nextid, 1)
|
||||
self.assertEqual(track.trackid, 0)
|
||||
self.assertEqual(self.added, track)
|
||||
|
||||
track2 = alloc.allocate(self.lib, test_tracks / "02 - Test {Disc 2}.ogg")
|
||||
self.assertEqual(alloc.tracks[1], track2)
|
||||
self.assertEqual(alloc[1], track2)
|
||||
self.assertEqual(len(alloc), 2)
|
||||
|
||||
self.assertIsNone(alloc.allocate(self.lib, test_tracks / "No Such File"))
|
||||
self.assertEqual(self.added, track2)
|
||||
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track, track2 ])
|
||||
track.library = None
|
||||
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track2 ])
|
||||
|
||||
alloc.remove(track)
|
||||
self.assertEqual(alloc[0], None)
|
||||
self.assertEqual(alloc.tracks, { 1 : track2 })
|
||||
self.assertEqual(alloc.nextid, 2)
|
||||
self.assertEqual(self.removed, track)
|
||||
|
||||
alloc.reset()
|
||||
self.assertEqual(alloc.nextid, 0)
|
||||
self.assertEqual(alloc.tracks, { })
|
||||
self.assertEqual(alloc.Added.subscribers, set())
|
||||
self.assertEqual(alloc.Removed.subscribers, set())
|
||||
|
||||
def test_allocator_autoremove(self):
|
||||
alloc = allocator.TrackAllocator()
|
||||
alloc.Removed.register(self.on_track_removed)
|
||||
|
||||
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
|
||||
track2 = alloc.allocate(self.lib, test_tracks / "02 - Test {Disc 2}.ogg")
|
||||
track2.path = pathlib.Path("No Such File")
|
||||
|
||||
self.assertEqual(alloc.autoremove(self.lib), [ track2 ])
|
||||
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track ])
|
||||
|
||||
def test_allocator_played(self):
|
||||
alloc = allocator.TrackAllocator()
|
||||
alloc.Updated.register(self.on_track_updated)
|
||||
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
|
||||
|
||||
alloc.played(track)
|
||||
self.assertEqual(track.playcount, 1)
|
||||
self.assertEqual(self.updated, track)
|
||||
|
||||
def test_allocator_state(self):
|
||||
alloc = allocator.TrackAllocator()
|
||||
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
|
||||
|
||||
state = alloc.__getstate__()
|
||||
self.assertEqual(state, { "tracks" : { 0 : track },
|
||||
"nextid" : 1 })
|
||||
|
||||
alloc.__dict__.clear()
|
||||
alloc.__setstate__(state)
|
||||
self.assertEqual(alloc.tracks, { 0 : track })
|
||||
self.assertEqual(alloc.nextid, 1)
|
||||
self.assertIsInstance(alloc.lock, type(threading.Lock()))
|
||||
self.assertIsInstance(alloc.Added, publisher.Publisher)
|
||||
self.assertIsInstance(alloc.Removed, publisher.Publisher)
|
||||
self.assertIsInstance(alloc.Updated, publisher.Publisher)
|
|
@ -1,186 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from lib import counter
|
||||
from lib import publisher
|
||||
from lib import tag
|
||||
from . import stack
|
||||
from . import tags
|
||||
import unittest
|
||||
|
||||
class FakeTrack:
|
||||
def __init__(self, n):
|
||||
self.n = n
|
||||
self.length = n
|
||||
|
||||
def add_to_playlist(self, name):
|
||||
tags.User.add(name, self)
|
||||
|
||||
def remove_from_playlist(self, name):
|
||||
tags.User[name].remove_track(self)
|
||||
|
||||
class TestTagStack(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.pushpop = None
|
||||
self.next_track = None
|
||||
|
||||
def tearDown(self):
|
||||
tags.reset()
|
||||
|
||||
def on_next_track(self, track):
|
||||
self.next_track = track
|
||||
|
||||
def on_push_pop(self, prev, new):
|
||||
self.pushpop = (prev, new)
|
||||
|
||||
def test_tag_stack_init(self):
|
||||
s = stack.TagStack()
|
||||
self.assertIsInstance(s.Counter, counter.Counter)
|
||||
self.assertIsInstance(s.PushPop, publisher.Publisher)
|
||||
self.assertIsInstance(s.NextTrack, publisher.Publisher)
|
||||
self.assertEqual(s.tags, [ ])
|
||||
|
||||
def test_tag_stack_next(self):
|
||||
s = stack.TagStack()
|
||||
t = tag.Tag("Test")
|
||||
s.NextTrack.register(self.on_next_track)
|
||||
s.push(t)
|
||||
|
||||
t.tracks = [ FakeTrack(1), FakeTrack(2), FakeTrack(3) ]
|
||||
tags.User["Collection"].tracks = [ FakeTrack(4), FakeTrack(5) ]
|
||||
|
||||
(res, cont) = s.next()
|
||||
self.assertEqual( (res.n, cont), (1, True) )
|
||||
self.assertEqual(self.next_track.n, 1)
|
||||
self.assertEqual(s.tags, [ t ])
|
||||
|
||||
(res, cont) = s.next()
|
||||
self.assertEqual( (res.n, cont), (2, True) )
|
||||
self.assertEqual(self.next_track.n, 2)
|
||||
self.assertEqual(s.tags, [ t ])
|
||||
|
||||
(res, cont) = s.next()
|
||||
self.assertEqual( (res.n, cont), (3, True) )
|
||||
self.assertEqual(self.next_track.n, 3)
|
||||
self.assertEqual(s.tags, [ t ])
|
||||
|
||||
(res, cont) = s.next()
|
||||
self.assertEqual( (res.n, cont), (4, True) )
|
||||
self.assertEqual(self.next_track.n, 4)
|
||||
self.assertEqual(s.tags, [ ])
|
||||
|
||||
(res, cont) = s.next()
|
||||
self.assertEqual( (res.n, cont), (5, True) )
|
||||
self.assertEqual(self.next_track.n, 5)
|
||||
|
||||
self.assertEqual([ t.n for t in tags.User["Previous"].tracks ],
|
||||
[ 5, 4, 3, 2, 1 ])
|
||||
|
||||
def test_tag_stack_autopause(self):
|
||||
s = stack.TagStack()
|
||||
t = tag.Tag("Test")
|
||||
s.push(t)
|
||||
|
||||
t.tracks = [ FakeTrack(1), FakeTrack(2), FakeTrack(3), FakeTrack(4) ]
|
||||
s.Counter.set_value(2)
|
||||
|
||||
(res, cont) = s.next()
|
||||
self.assertEqual( (res.n, cont), (1, True) )
|
||||
self.assertEqual(s.tags, [ t ])
|
||||
self.assertEqual(s.Counter.get_value(), 1)
|
||||
|
||||
(res, cont) = s.next()
|
||||
self.assertEqual( (res.n, cont), (2, True) )
|
||||
self.assertEqual(s.tags, [ t ])
|
||||
self.assertEqual(s.Counter.get_value(), 0)
|
||||
|
||||
(res, cont) = s.next()
|
||||
self.assertEqual( (res.n, cont), (3, False) )
|
||||
self.assertEqual(s.tags, [ t ])
|
||||
self.assertEqual(s.Counter.get_value(), -1)
|
||||
|
||||
(res, cont) = s.next()
|
||||
self.assertEqual( (res.n, cont), (4, True) )
|
||||
self.assertEqual(s.tags, [ t ])
|
||||
self.assertEqual(s.Counter.get_value(), -1)
|
||||
|
||||
def test_tag_stack_pop(self):
|
||||
s = stack.TagStack()
|
||||
t1 = tag.Tag("Test")
|
||||
t2 = tag.Tag("Test Two")
|
||||
s.tags = [ t1, t2 ]
|
||||
s.PushPop.register(self.on_push_pop)
|
||||
|
||||
s.pop()
|
||||
self.assertEqual(s.tags, [ t2 ])
|
||||
self.assertEqual(self.pushpop, (t1, t2))
|
||||
|
||||
s.pop()
|
||||
self.assertEqual(s.tags, [ ])
|
||||
self.assertEqual(self.pushpop, (t2, tags.User["Collection"]))
|
||||
|
||||
def test_tag_stack_previous(self):
|
||||
s = stack.TagStack()
|
||||
for i in [ 1, 2, 3 ]:
|
||||
tags.User["Previous"].add_track(i)
|
||||
|
||||
self.assertEqual(s.previous(), 2)
|
||||
self.assertEqual(s.previous(), 1)
|
||||
self.assertIsNone(s.previous())
|
||||
|
||||
def test_tag_stack_push(self):
|
||||
s = stack.TagStack()
|
||||
t1 = tag.Tag("Test")
|
||||
t2 = tag.Tag("Test Two")
|
||||
t1.current = 3
|
||||
s.PushPop.register(self.on_push_pop)
|
||||
|
||||
s.push(t1)
|
||||
self.assertEqual(s.tags, [ t1 ])
|
||||
self.assertEqual(t1.current, -1)
|
||||
self.assertEqual(self.pushpop, (tags.User["Collection"], t1))
|
||||
|
||||
s.push(t2)
|
||||
self.assertEqual(s.tags, [ t2, t1 ])
|
||||
self.assertEqual(self.pushpop, (t1, t2))
|
||||
|
||||
s.push(t1)
|
||||
self.assertEqual(s.tags, [ t1, t2 ])
|
||||
self.assertEqual(self.pushpop, (t2, t1))
|
||||
|
||||
s.push(tags.User["Previous"])
|
||||
self.assertEqual(s.tags, [ t1, t2 ])
|
||||
s.push(tags.User["Collection"])
|
||||
self.assertEqual(s.tags, [ ])
|
||||
self.assertEqual(self.pushpop, (t1, tags.User["Collection"]))
|
||||
|
||||
def test_tag_stack_queue(self):
|
||||
s = stack.TagStack()
|
||||
s.queue(FakeTrack(1))
|
||||
self.assertEqual(s.tags, [ tags.User["Up Next"] ])
|
||||
|
||||
(res, cont) = s.next()
|
||||
self.assertEqual( (res.n, cont), (1, True) )
|
||||
self.assertEqual(tags.User["Up Next"].tracks, [ ])
|
||||
|
||||
def test_tag_stack_state(self):
|
||||
s = stack.TagStack()
|
||||
t = tag.Tag("Test")
|
||||
s.push(t)
|
||||
|
||||
state = s.__getstate__()
|
||||
self.assertEqual(state, { "tags" : [ t ] })
|
||||
|
||||
s.__dict__.clear()
|
||||
s.__setstate__(state)
|
||||
self.assertEqual(s.tags, [ t ])
|
||||
self.assertIsInstance(s.Counter, counter.Counter)
|
||||
self.assertIsInstance(s.PushPop, publisher.Publisher)
|
||||
self.assertIsInstance(s.NextTrack, publisher.Publisher)
|
||||
|
||||
s.PushPop.register(self.on_push_pop)
|
||||
s.PushPop.register(self.on_next_track)
|
||||
s.count = 3
|
||||
s.reset()
|
||||
self.assertEqual(s.tags, [ ])
|
||||
self.assertIsInstance(s.Counter, counter.Counter)
|
||||
self.assertEqual(len(s.PushPop.subscribers), 0)
|
||||
self.assertEqual(len(s.NextTrack.subscribers), 0)
|
|
@ -1,174 +0,0 @@
|
|||
# Copyright 2020 (c) Anna Schumaker.
|
||||
import db
|
||||
import lib
|
||||
import pathlib
|
||||
import tagdb
|
||||
import unittest
|
||||
|
||||
test_tracks = pathlib.Path("./data/Test Album")
|
||||
|
||||
class TestLibraryTag(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
db.NewDatabase = True
|
||||
tagdb.reset()
|
||||
|
||||
def test_library_tag_init(self):
|
||||
library = tagdb.LibraryTag(test_tracks)
|
||||
self.assertIsInstance(library, lib.tag.Tag)
|
||||
self.assertIsInstance(library.clear, lib.thread.Thread)
|
||||
self.assertIsInstance(library.scan, lib.thread.Thread)
|
||||
|
||||
def test_library_tag_state(self):
|
||||
lib = tagdb.LibraryTag(test_tracks)
|
||||
lib.scan().join()
|
||||
|
||||
state = lib.__getstate__()
|
||||
self.assertEqual(set(state.keys()),
|
||||
set([ "name", "sort", "current", "loop", "random", "tracks" ]))
|
||||
self.assertEqual(state["name"], test_tracks)
|
||||
self.assertEqual(state["tracks"], [ i for i in range(12) ])
|
||||
|
||||
lib.__dict__.clear()
|
||||
lib.__setstate__(state)
|
||||
lib.fix_tracks()
|
||||
self.assertEqual(lib.name, test_tracks)
|
||||
self.assertEqual(lib.tracks, [ tagdb.Tracks[i] for i in range(12) ])
|
||||
self.assertEqual(lib.clear.func, lib.__do_clear__)
|
||||
self.assertEqual(lib.scan.func, lib.__do_scan__)
|
||||
|
||||
def test_library_tag_scan(self):
|
||||
lib = tagdb.LibraryTag(test_tracks)
|
||||
lib.scan().join()
|
||||
for i in range(12):
|
||||
self.assertEqual(lib.tracks[i], tagdb.Tracks[i])
|
||||
|
||||
def test_library_tag_scan_new(self):
|
||||
lib = tagdb.LibraryTag(test_tracks)
|
||||
lib.scan().join()
|
||||
for trak in [ tagdb.Tracks[0], tagdb.Tracks[11] ]:
|
||||
lib.remove_track(trak)
|
||||
tagdb.Tracks.remove(trak)
|
||||
lib.scan().join()
|
||||
self.assertEqual(len(lib.tracks), 12)
|
||||
|
||||
def test_library_tag_scan_remove(self):
|
||||
lib = tagdb.LibraryTag(test_tracks)
|
||||
lib.scan().join()
|
||||
|
||||
trak = tagdb.track.Track(tagdb.Tracks.nextid,
|
||||
test_tracks / "01 - Test Track.ogg", lib)
|
||||
trak.path = pathlib.Path("No Such File")
|
||||
lib.tracks.append(trak)
|
||||
tagdb.Tracks.tracks[trak.trackid] = trak
|
||||
lib.scan().join()
|
||||
self.assertNotIn(trak, lib.tracks)
|
||||
|
||||
def test_library_tag_clear(self):
|
||||
lib = tagdb.LibraryTag(test_tracks)
|
||||
lib.scan().join()
|
||||
self.assertEqual(len(lib), 12)
|
||||
|
||||
lib.clear().join()
|
||||
self.assertEqual(len(lib), 0)
|
||||
self.assertEqual(len(tagdb.Tracks), 0)
|
||||
|
||||
|
||||
class TestLibraryStore(unittest.TestCase):
|
||||
def test_library_store(self):
|
||||
store = tagdb.LibraryStore()
|
||||
lib = store.add(test_tracks)
|
||||
self.assertIsInstance(lib, tagdb.LibraryTag)
|
||||
|
||||
lib.scan().join()
|
||||
self.assertEqual(len(lib), 12)
|
||||
|
||||
store.remove(lib)
|
||||
lib.clear.join()
|
||||
self.assertEqual(len(lib), 0)
|
||||
|
||||
|
||||
class TestTrackDB(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
tagdb.reset()
|
||||
|
||||
def test_tagdb_init(self):
|
||||
self.assertIsInstance(tagdb.Bus, lib.bus.Bus)
|
||||
self.assertIsInstance(tagdb.Tracks, tagdb.allocator.TrackAllocator)
|
||||
self.assertIsInstance(tagdb.Stack, tagdb.stack.TagStack)
|
||||
self.assertIsInstance(tagdb.Library, tagdb.LibraryStore)
|
||||
|
||||
self.assertEqual(tagdb.File, "tagdb.pickle")
|
||||
self.assertEqual(tagdb.Bus.timeout, 500)
|
||||
|
||||
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Library.Removed.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Tracks.Removed.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Tracks.Updated.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.tags.User.Added.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.tags.User.Removed.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Stack.PushPop.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Stack.NextTrack.subscribers)
|
||||
|
||||
def test_tagdb_save_load(self):
|
||||
db_file = lib.data.DataFile(tagdb.File, lib.data.READ)
|
||||
library = tagdb.Library.add(test_tracks)
|
||||
library.scan()
|
||||
tagdb.Stack.push(library)
|
||||
|
||||
tagdb.save()
|
||||
self.assertEqual(tagdb.Bus.passengers, [ (tagdb._do_save, ()) ])
|
||||
self.assertFalse(db_file.exists())
|
||||
|
||||
library.scan.join()
|
||||
tagdb.Bus.complete()
|
||||
self.assertTrue(db_file.exists())
|
||||
|
||||
db.reset()
|
||||
tagdb.tags.reset()
|
||||
tagdb.Library.reset()
|
||||
tagdb.Tracks.reset()
|
||||
tagdb.Stack.reset()
|
||||
tagdb.load()
|
||||
|
||||
self.assertEqual(len(tagdb.Tracks), 12)
|
||||
self.assertEqual(len(tagdb.Library), 1)
|
||||
self.assertEqual(len(tagdb.Library[test_tracks]), 12)
|
||||
self.assertEqual(len(tagdb.tags.Artist), 3)
|
||||
self.assertEqual(len(tagdb.tags.Album), 12)
|
||||
self.assertEqual(len(tagdb.tags.Genre), 4)
|
||||
self.assertEqual(len(tagdb.tags.Decade), 2)
|
||||
self.assertEqual(len(tagdb.tags.Year), 2)
|
||||
self.assertEqual(tagdb.Stack.tags, [ tagdb.Library[test_tracks] ])
|
||||
|
||||
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.tags.User.Added.subscribers)
|
||||
|
||||
def test_tagdb_stress(self):
|
||||
lib = tagdb.Library.add(pathlib.Path("./trier/Test Library/"))
|
||||
lib.scan().join()
|
||||
tagdb.Bus.complete()
|
||||
|
||||
tagdb.Library.remove(lib)
|
||||
lib.clear.join()
|
||||
tagdb.Bus.complete()
|
||||
|
||||
def test_tagdb_reset(self):
|
||||
tagdb.Tracks.Added.register(1)
|
||||
tagdb.Tracks.Removed.register(1)
|
||||
tagdb.Tracks.Updated.register(1)
|
||||
tagdb.Library.Added.register(1)
|
||||
tagdb.Library.Removed.register(1)
|
||||
tagdb.Library.store = { "a" : 1, "b" : 2, "c" : 3 }
|
||||
with lib.data.DataFile(tagdb.File, lib.data.WRITE) as f:
|
||||
f.pickle([ 0, [] ])
|
||||
|
||||
tagdb.reset()
|
||||
self.assertEqual(len(tagdb.Library), 0)
|
||||
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Tracks.Removed.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Tracks.Updated.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
|
||||
self.assertIn(tagdb.save, tagdb.Library.Removed.subscribers)
|
||||
self.assertFalse(lib.data.DataFile(tagdb.File, lib.data.READ).exists())
|
|
@ -1,50 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from lib import tagstore
|
||||
from . import tags
|
||||
from . import user
|
||||
import unittest
|
||||
|
||||
class TestTags(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
tags.reset()
|
||||
|
||||
def test_tags_init(self):
|
||||
self.assertIsInstance(tags.Artist, tagstore.TagStore)
|
||||
self.assertIsInstance(tags.Album, tagstore.TagSuperStore)
|
||||
self.assertIsInstance(tags.Genre, tagstore.TagStore)
|
||||
self.assertIsInstance(tags.Decade, tagstore.TagStore)
|
||||
self.assertIsInstance(tags.Year, tagstore.TagSuperStore)
|
||||
self.assertIsInstance(tags.User, user.UserTagStore)
|
||||
|
||||
def test_tags_reset(self):
|
||||
tags.Artist.store = {"a" : 1 }
|
||||
tags.Album.store = {("a", "b") : 2 }
|
||||
tags.Genre.store = {"c" : 3 }
|
||||
tags.Decade.store = {"d" : 4 }
|
||||
tags.User.store = {"e" : 5 }
|
||||
|
||||
tags.reset()
|
||||
self.assertEqual(tags.Artist.store, { })
|
||||
self.assertEqual(tags.Album.store, { })
|
||||
self.assertEqual(tags.Genre.store, { })
|
||||
self.assertEqual(tags.Decade.store, { })
|
||||
self.assertIsNotNone(tags.User["Collection"])
|
||||
self.assertIsNotNone(tags.User["Up Next"])
|
||||
self.assertIsNotNone(tags.User["Previous"])
|
||||
self.assertIsNotNone(tags.User["Favorites"])
|
||||
self.assertIsNotNone(tags.User["Up Next"])
|
||||
|
||||
def test_tags_state(self):
|
||||
state = tags.get_state()
|
||||
self.assertEqual(state, ( tags.Artist, tags.Album, tags.Genre,
|
||||
tags.Decade, tags.Year, tags.User ))
|
||||
|
||||
tags.set_state(*(1, 2, 3, 4, 5, 6))
|
||||
self.assertEqual(tags.Artist, 1)
|
||||
self.assertEqual(tags.Album, 2)
|
||||
self.assertEqual(tags.Genre, 3)
|
||||
self.assertEqual(tags.Decade, 4)
|
||||
self.assertEqual(tags.Year, 5)
|
||||
self.assertEqual(tags.User, 6)
|
||||
|
||||
tags.set_state(*state)
|
|
@ -1,173 +0,0 @@
|
|||
# Copyright 2020 (c) Anna Schumaker.
|
||||
from lib import publisher
|
||||
from gi.repository import GObject
|
||||
from . import tags
|
||||
from . import track
|
||||
import datetime
|
||||
import pathlib
|
||||
import unittest
|
||||
|
||||
test_tracks = pathlib.Path("./data/Test Album")
|
||||
|
||||
class FakeLibrary:
|
||||
def __init__(self):
|
||||
self.name = test_tracks
|
||||
|
||||
class TestTrack(unittest.TestCase):
|
||||
def setUp(self):
|
||||
tags.reset()
|
||||
self.lib = FakeLibrary()
|
||||
|
||||
def tearDown(self):
|
||||
tags.reset()
|
||||
|
||||
def test_track_init(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
self.assertEqual(trak.trackid, 1)
|
||||
self.assertEqual(trak.filepath(), test_tracks / "01 - Test Track.ogg")
|
||||
self.assertIsInstance(trak, GObject.Object)
|
||||
|
||||
def test_track_album(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
artist = trak.artist
|
||||
self.assertEqual(trak.album, tags.Album[artist, "Test Album"])
|
||||
self.assertEqual(trak["album"], "Test Album")
|
||||
|
||||
def test_track_artist(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
self.assertEqual(trak.artist, tags.Artist["Test Artist"])
|
||||
self.assertEqual(trak["artist"], "Test Artist")
|
||||
self.assertEqual(trak.artist.sort, "artist, test")
|
||||
|
||||
trak2 = track.Track(2, test_tracks / "02 - Test {Disc 2}.ogg", self.lib)
|
||||
self.assertEqual(trak2.artist, tags.Artist["Test Album Artist"])
|
||||
self.assertEqual(trak2.artist.sort, "album artist, test")
|
||||
|
||||
def test_track_decade(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
self.assertEqual(trak.decade, tags.Decade["2010s"])
|
||||
self.assertEqual(trak["decade"], "2010s")
|
||||
|
||||
def test_track_discnumber(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
self.assertEqual(trak.discnumber, 1)
|
||||
self.assertEqual(trak["discnumber"], "01")
|
||||
|
||||
def test_track_filepath(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
self.assertEqual(trak.path, pathlib.Path("01 - Test Track.ogg"))
|
||||
self.assertEqual(trak.filepath(), test_tracks / "01 - Test Track.ogg")
|
||||
|
||||
def test_track_genres(self):
|
||||
trak = track.Track(1, test_tracks / "02 - Test {Disc 2}.ogg", self.lib)
|
||||
genlist = [ tags.Genre["Test"], tags.Genre["Genre"], tags.Genre["List"] ]
|
||||
self.assertEqual(trak.genres, genlist)
|
||||
self.assertEqual(trak["genres"], "Test, Genre, List")
|
||||
|
||||
def test_track_length(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
self.assertEqual(trak.length, 10)
|
||||
self.assertEqual(trak["length"], "0:10")
|
||||
trak.length = 61
|
||||
self.assertEqual(trak["length"], "1:01")
|
||||
trak.length = 3
|
||||
self.assertEqual(trak["length"], "0:03")
|
||||
|
||||
def test_track_played(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
trak.playcount = 0
|
||||
trak.lastplayed = None
|
||||
self.assertEqual(trak["playcount"], "0")
|
||||
self.assertEqual(trak["lastplayed"], "Never")
|
||||
trak.played()
|
||||
self.assertEqual(trak.playcount, 1)
|
||||
self.assertEqual(trak.lastplayed.date(), datetime.date.today())
|
||||
self.assertEqual(trak["playcount"], "1")
|
||||
#self.assertEqual(trak["lastplayed"], str(datetime.date.today()))
|
||||
trak.played()
|
||||
self.assertEqual(trak.playcount, 2)
|
||||
self.assertEqual(trak["playcount"], "2")
|
||||
|
||||
def test_track_title(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
self.assertEqual(trak.title, "Test Track")
|
||||
self.assertEqual(trak["title"], "Test Track")
|
||||
|
||||
def test_track_tracknumber(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
self.assertEqual(trak.tracknumber, 1)
|
||||
self.assertEqual(trak["tracknumber"], "1-01")
|
||||
trak.tracknumber = 10
|
||||
self.assertEqual(trak["tracknumber"], "1-10")
|
||||
|
||||
def test_track_year(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
self.assertEqual(trak.year, tags.Year[trak.decade, "2019"])
|
||||
self.assertEqual(trak["year"], "2019")
|
||||
trak2 = track.Track(2, test_tracks / "02 - Test {Disc 2}.ogg", self.lib)
|
||||
self.assertEqual(trak2.year, tags.Year[trak2.decade, "2019"])
|
||||
self.assertEqual(trak2["year"], "2019")
|
||||
|
||||
def test_track_playlists(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
default = [ tags.User["Collection"], tags.User["New Tracks"] ]
|
||||
self.assertEqual(trak.playlists, default)
|
||||
|
||||
trak.add_to_playlist("Test")
|
||||
self.assertEqual(trak.playlists, default + [ tags.User["Test"] ])
|
||||
|
||||
trak.remove_from_playlist("Test")
|
||||
self.assertEqual(trak.playlists, default)
|
||||
self.assertEqual(len(tags.User["Test"]), 0)
|
||||
|
||||
def test_track_about_to_remove(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
trak.about_to_remove()
|
||||
|
||||
self.assertEqual(len(tags.Artist), 0)
|
||||
self.assertEqual(len(tags.Album), 0)
|
||||
self.assertEqual(len(tags.Genre), 0)
|
||||
self.assertEqual(len(tags.Decade), 0)
|
||||
self.assertEqual(len(tags.Year), 0)
|
||||
self.assertEqual(len(tags.User["Collection"]), 0)
|
||||
self.assertEqual(len(tags.User["New Tracks"]), 0)
|
||||
|
||||
def test_track_state(self):
|
||||
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
|
||||
trak.add_to_playlist("Starred")
|
||||
trak.add_to_playlist("Previous")
|
||||
|
||||
state = trak.__getstate__()
|
||||
self.assertEqual(state["artist"], "Test Artist")
|
||||
self.assertEqual(state["album"], "Test Album")
|
||||
self.assertEqual(state["genres"], [ "Test" ])
|
||||
self.assertEqual(state["decade"], "2010s")
|
||||
self.assertEqual(state["year"], "2019")
|
||||
self.assertEqual(state["playlists"], [ "Collection", "Starred" ])
|
||||
|
||||
tags.Artist["Test Artist"].tracks = [ 1 ]
|
||||
tags.Album[trak.artist, "Test Album"].tracks = [ 1 ]
|
||||
tags.Genre["Test"].tracks = [ 1 ]
|
||||
tags.Decade["2010s"].tracks = [ 1 ]
|
||||
tags.Year[trak.decade, "2019"].tracks = [ 1 ]
|
||||
tags.User["Collection"].tracks = [ 1 ]
|
||||
tags.User["Starred"].tracks = [ 1 ]
|
||||
|
||||
trak.__dict__.clear()
|
||||
trak.__setstate__(state)
|
||||
trak.__set_tags__()
|
||||
self.assertEqual(trak.artist, tags.Artist["Test Artist"])
|
||||
self.assertEqual(trak.album, tags.Album[trak.artist, "Test Album"])
|
||||
self.assertEqual(trak.genres, [ tags.Genre["Test"] ])
|
||||
self.assertEqual(trak.decade, tags.Decade["2010s"])
|
||||
self.assertEqual(trak.year, tags.Year[trak.decade, "2019"])
|
||||
self.assertEqual(trak.playlists, [ tags.User["Collection"],
|
||||
tags.User["Starred"] ])
|
||||
|
||||
self.assertEqual(tags.Artist["Test Artist"].tracks, [ trak ])
|
||||
self.assertEqual(tags.Album[trak.artist, "Test Album"].tracks, [ trak ])
|
||||
self.assertEqual(tags.Genre["Test"].tracks, [ trak ])
|
||||
self.assertEqual(tags.Decade["2010s"].tracks, [ trak ])
|
||||
self.assertEqual(tags.Year[trak.decade, "2019"].tracks, [ trak ])
|
||||
self.assertEqual(tags.User["Collection"].tracks, [ trak ])
|
||||
self.assertEqual(tags.User["Starred"].tracks, [ trak ])
|
|
@ -1,99 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from lib import fake
|
||||
from lib import tag
|
||||
from lib import tagstore
|
||||
from . import user
|
||||
import unittest
|
||||
|
||||
class TestUserTags(unittest.TestCase):
|
||||
def test_collection_tag(self):
|
||||
c = user.CollectionTag()
|
||||
self.assertIsInstance(c, tag.Tag)
|
||||
self.assertTrue(c.loop)
|
||||
self.assertFalse(c.can_loop())
|
||||
|
||||
def test_new_tracks_tag(self):
|
||||
n = user.NewTracksTag()
|
||||
self.assertIsInstance(n, tag.Tag)
|
||||
self.assertTrue(n.can_loop())
|
||||
self.assertTrue(n.can_random())
|
||||
|
||||
n.tracks = [ fake.Track(1), fake.Track(2), fake.Track(3) ]
|
||||
state = n.__getstate__()
|
||||
self.assertEqual(state["name"], "New Tracks")
|
||||
self.assertEqual(state["sort"], "new tracks")
|
||||
self.assertEqual(state["current"], -1)
|
||||
self.assertEqual(state["tracks"], [ ])
|
||||
self.assertFalse(state["loop"])
|
||||
self.assertFalse(state["random"])
|
||||
|
||||
def test_previous_tag(self):
|
||||
p = user.PreviousTag()
|
||||
self.assertIsInstance(p, tag.Tag)
|
||||
self.assertFalse(p.can_loop())
|
||||
self.assertFalse(p.can_random())
|
||||
|
||||
p.add_track(fake.Track(1))
|
||||
self.assertEqual(p.tracks, [ fake.Track(1) ])
|
||||
self.assertIsNone(p.next())
|
||||
|
||||
p.add_track(fake.Track(2))
|
||||
self.assertEqual(p.tracks, [ fake.Track(2), fake.Track(1) ])
|
||||
self.assertEqual(p.next(), fake.Track(1))
|
||||
|
||||
p.add_track(fake.Track(3))
|
||||
self.assertEqual(p.tracks, [ fake.Track(3), fake.Track(2), fake.Track(1) ])
|
||||
self.assertEqual(p.next(), fake.Track(2))
|
||||
self.assertEqual(p.next(), fake.Track(1))
|
||||
|
||||
state = p.__getstate__()
|
||||
self.assertEqual(state["name"], "Previous")
|
||||
self.assertEqual(state["sort"], "previous")
|
||||
self.assertEqual(state["current"], 2)
|
||||
self.assertEqual(state["tracks"], [ ])
|
||||
self.assertFalse(state["loop"])
|
||||
self.assertFalse(state["random"])
|
||||
|
||||
def test_up_next_tag(self):
|
||||
u = user.UpNextTag()
|
||||
self.assertIsInstance(u, tag.Tag)
|
||||
self.assertFalse(u.loop)
|
||||
self.assertFalse(u.can_loop())
|
||||
self.assertIsNone(u.next())
|
||||
|
||||
u.tracks = [ fake.Track(1, u), fake.Track(2, u), fake.Track(3, u) ]
|
||||
self.assertEqual(u.next(), fake.Track(1, u))
|
||||
self.assertEqual(u.tracks, [ fake.Track(2, u), fake.Track(3, u) ])
|
||||
|
||||
self.assertEqual(u.next(), fake.Track(2, u))
|
||||
self.assertEqual(u.tracks, [ fake.Track(3, u) ])
|
||||
|
||||
u.random = True
|
||||
self.assertEqual(u.next(), fake.Track(3, u))
|
||||
self.assertEqual(u.tracks, [ ])
|
||||
self.assertFalse(u.random)
|
||||
|
||||
|
||||
class TestUserTagStore(unittest.TestCase):
|
||||
def test_user_init(self):
|
||||
store = user.UserTagStore()
|
||||
self.assertIsInstance(store, tagstore.TagStore)
|
||||
|
||||
self.assertIsInstance(store["Collection"], user.CollectionTag)
|
||||
self.assertIsInstance(store["New Tracks"], user.NewTracksTag)
|
||||
self.assertIsInstance(store["Previous"], user.PreviousTag)
|
||||
self.assertIsInstance(store["Favorites"], tag.Tag)
|
||||
self.assertIsInstance(store["Up Next"], user.UpNextTag)
|
||||
|
||||
def test_user_reset(self):
|
||||
store = user.UserTagStore()
|
||||
store.add("Playlist")
|
||||
self.assertEqual(len(store), 6)
|
||||
|
||||
store.reset()
|
||||
self.assertEqual(len(store), 5)
|
||||
self.assertIsInstance(store["Collection"], user.CollectionTag)
|
||||
self.assertIsInstance(store["New Tracks"], user.NewTracksTag)
|
||||
self.assertIsInstance(store["Previous"], user.PreviousTag)
|
||||
self.assertIsInstance(store["Favorites"], tag.Tag)
|
||||
self.assertIsInstance(store["Up Next"], user.UpNextTag)
|
|
@ -1,98 +0,0 @@
|
|||
# Copyright 2020 (c) Anna Schumaker.
|
||||
from lib import metadata
|
||||
from lib import publisher
|
||||
from . import tags
|
||||
from gi.repository import GObject
|
||||
import datetime
|
||||
import db
|
||||
import scanner
|
||||
|
||||
class Track(GObject.Object):
|
||||
def __init__(self, trackid, filepath, library):
|
||||
GObject.Object.__init__(self)
|
||||
self.trackid = trackid
|
||||
self.path = filepath.relative_to(library.name)
|
||||
self.library = library
|
||||
self.lastplayed = None
|
||||
self.playcount = 0
|
||||
|
||||
with metadata.Metadata(filepath) as meta:
|
||||
self.title = meta.title()
|
||||
self.length = meta.length()
|
||||
self.discnumber = meta.discnumber()
|
||||
self.tracknumber = meta.tracknumber()
|
||||
|
||||
self.artist = tags.Artist.add(meta.artist(), self, sort=meta.artistsort())
|
||||
self.album = tags.Album.add(self.artist, meta.album(), self)
|
||||
self.genres = [ tags.Genre.add(g, self) for g in meta.genres() ]
|
||||
self.decade = tags.Decade.add(f"{meta.decade()}s", self)
|
||||
self.year = tags.Year.add(self.decade, str(meta.year()), self)
|
||||
|
||||
self.playlists = [ tags.User.add("Collection", self),
|
||||
tags.User.add("New Tracks", self) ]
|
||||
|
||||
def __getitem__(self, item):
|
||||
tag = self.__dict__.get(item, None)
|
||||
if item == "length":
|
||||
(m, s) = divmod(tag, 60)
|
||||
return f"{m}:{s:02}"
|
||||
elif item == "discnumber":
|
||||
return f"{tag:02}"
|
||||
elif item == "tracknumber":
|
||||
return f"{self.discnumber}-{tag:02}"
|
||||
elif item == "lastplayed":
|
||||
return "Never" if tag == None else str(tag)
|
||||
elif item == "genres":
|
||||
return ", ".join([ str(g) for g in self.genres ])
|
||||
return None if tag == None else str(tag)
|
||||
|
||||
def __getstate__(self):
|
||||
state = self.__dict__.copy()
|
||||
state["artist"] = str(self.artist)
|
||||
state["album"] = str(self.album)
|
||||
state["genres"] = [ str(g) for g in self.genres ]
|
||||
state["decade"] = str(self.decade)
|
||||
state["year"] = str(self.year)
|
||||
state["playlists" ] = [ str(p) for p in self.playlists \
|
||||
if str(p) not in ("New Tracks", "Previous") ]
|
||||
return state
|
||||
|
||||
def __setstate__(self, state):
|
||||
GObject.Object.__init__(self)
|
||||
self.__dict__.update(state)
|
||||
|
||||
def __set_tags__(self):
|
||||
self.artist = tags.Artist.init_track(self.artist, self)
|
||||
self.album = tags.Album.init_track(self.artist, self.album, self)
|
||||
self.genres = [ tags.Genre.init_track(g, self) for g in self.genres ]
|
||||
self.decade = tags.Decade.init_track(self.decade, self)
|
||||
self.year = tags.Year.init_track(self.decade, self.year, self)
|
||||
self.playlists = [ tags.User.init_track(p, self) for p in self.playlists ]
|
||||
scanner.import_track(db.library.Table.find(self.library.name),
|
||||
self.filepath(), self.playcount, self.lastplayed,
|
||||
[ p.name for p in self.playlists ])
|
||||
|
||||
def about_to_remove(self):
|
||||
tags.Artist.remove(self.artist, self)
|
||||
tags.Album.remove(self.album, self)
|
||||
for genre in self.genres:
|
||||
tags.Genre.remove(genre, self)
|
||||
tags.Decade.remove(self.decade, self)
|
||||
tags.Year.remove(self.year, self)
|
||||
for tag in self.playlists:
|
||||
tag.remove_track(self)
|
||||
|
||||
def add_to_playlist(self, name):
|
||||
self.playlists.append(tags.User.add(name, self))
|
||||
|
||||
def filepath(self):
|
||||
return self.library.name / self.path
|
||||
|
||||
def played(self):
|
||||
self.playcount += 1
|
||||
self.lastplayed = datetime.datetime.now()
|
||||
|
||||
def remove_from_playlist(self, name):
|
||||
tag = tags.User[name]
|
||||
tag.remove_track(self)
|
||||
self.playlists.remove(tag)
|
|
@ -1,70 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from lib import tag
|
||||
from lib import tagstore
|
||||
|
||||
class CollectionTag(tag.Tag):
|
||||
def __init__(self):
|
||||
tag.Tag.__init__(self, "Collection")
|
||||
self.loop = True
|
||||
|
||||
def can_loop(self): return False
|
||||
|
||||
|
||||
class NewTracksTag(tag.Tag):
|
||||
def __init__(self):
|
||||
tag.Tag.__init__(self, "New Tracks")
|
||||
|
||||
def __getstate__(self):
|
||||
state = super().__getstate__()
|
||||
state["tracks"].clear()
|
||||
return state
|
||||
|
||||
|
||||
class PreviousTag(tag.Tag):
|
||||
def __init__(self):
|
||||
tag.Tag.__init__(self, "Previous")
|
||||
|
||||
def __getstate__(self):
|
||||
state = super().__getstate__()
|
||||
state["tracks"].clear()
|
||||
return state
|
||||
|
||||
def can_random(self): return False
|
||||
def can_loop(self): return False
|
||||
|
||||
def add_track(self, track):
|
||||
with self.lock:
|
||||
self.tracks.insert(0, track)
|
||||
self.current = 0
|
||||
self.TrackAdded.publish(self, track, 0)
|
||||
|
||||
|
||||
class UpNextTag(tag.Tag):
|
||||
def __init__(self):
|
||||
tag.Tag.__init__(self, "Up Next")
|
||||
|
||||
def can_loop(self): return False
|
||||
|
||||
def next(self):
|
||||
track = super().next()
|
||||
if track is not None:
|
||||
track.remove_from_playlist("Up Next")
|
||||
with self.lock:
|
||||
self.current -= 1
|
||||
if len(self.tracks) == 0:
|
||||
self.random = False
|
||||
return track
|
||||
|
||||
|
||||
class UserTagStore(tagstore.TagStore):
|
||||
def __init__(self):
|
||||
tagstore.TagStore.__init__(self)
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
super().reset()
|
||||
self.store["Collection"] = CollectionTag()
|
||||
self.store["Favorites"] = tag.Tag("Favorites")
|
||||
self.store["New Tracks"] = NewTracksTag()
|
||||
self.store["Previous"] = PreviousTag()
|
||||
self.store["Up Next"] = UpNextTag()
|
|
@ -1,24 +1,3 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import icons
|
||||
from . import window
|
||||
from gi.repository import Gtk
|
||||
import audio
|
||||
import db
|
||||
|
||||
class EmmentalApplication(Gtk.Application):
|
||||
def __init__(self, *args, **kwargs):
|
||||
Gtk.Application.__init__(self, *args, application_id="org.gtk.emmental", **kwargs)
|
||||
|
||||
def do_activate(self):
|
||||
self.window.present()
|
||||
|
||||
def do_startup(self):
|
||||
self.window = window.Window()
|
||||
Gtk.Application.do_startup(self)
|
||||
self.add_window(self.window)
|
||||
|
||||
def do_shutdown(self):
|
||||
db.sql.optimize()
|
||||
Gtk.Application.do_shutdown(self)
|
||||
|
||||
Application = EmmentalApplication()
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from . import window
|
||||
from lib import version
|
||||
from gi.repository import Gtk
|
||||
import pathlib
|
||||
|
||||
IconPath = pathlib.Path("data/").absolute()
|
||||
|
||||
if version.DEBUG == True:
|
||||
if __debug__ == True:
|
||||
Display = Gtk.Label().get_display()
|
||||
Theme = Gtk.IconTheme.get_for_display(Display)
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
from gi.repository import Gtk, Gdk
|
||||
import audio
|
||||
import tagdb
|
||||
|
||||
Event = Gtk.EventControllerKey()
|
||||
Event.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
|
||||
|
|
|
@ -1,9 +0,0 @@
|
|||
# Copyright 2021 (c) Anna Schumaker.
|
||||
import unittest
|
||||
import ui
|
||||
from gi.repository import Gtk
|
||||
|
||||
class TestEmmentalApplication(unittest.TestCase):
|
||||
def test_application(self):
|
||||
app = ui.EmmentalApplication()
|
||||
self.assertIsInstance(app, Gtk.Application)
|
Loading…
Reference in New Issue