Compare commits

...

19 Commits

Author SHA1 Message Date
Anna Schumaker 18743f05c4 Emmental 2.10
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2022-02-25 13:32:32 -05:00
Anna Schumaker ab6eb556ad db: Fix track_adjusts_current() when the track has been removed
In this case, the call to get_track_index() returns None which can't be
used for the comparisons we're doing. Make sure we handle the None
result explicitely.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2022-02-22 10:08:34 -05:00
Anna Schumaker 1296857189 playlist: Don't leave the Subtitle column blank
Instead, fill in something generated from the disc number but make it
dimmed.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2022-01-10 11:36:42 -05:00
Anna Schumaker 73ba296d74 playlist: Use the new match / case statement in __init__.py:key_pressed()
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 17:33:26 -05:00
Anna Schumaker f9cec5e1b3 audio: Use the new match / case statement in scale.py:format_value()
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 17:24:09 -05:00
Anna Schumaker 0c7a4a4a4c db: Use the new match / case statement in user.py:do_factory()
This is cleaner than using a bunch of elif-s to pick the right playlist
type

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 17:23:57 -05:00
Anna Schumaker 289420e504 playlist: Add a Favorite toggle button
For adding / removing the currently playing track from the Favorites
playlist

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 15:50:11 -05:00
Anna Schumaker 15059db59a playlist: Create a TrackBox containing the JumpButton
And put it before the filter entry in the ui

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 15:50:11 -05:00
Anna Schumaker c4adea15bb playlist: Split out the ControlBox as a base class
And create a PlaylistBox inheriting from it containing the buttons.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 15:50:11 -05:00
Anna Schumaker 328dce0be2 scanner: Fix scanner.update_library() function
We were scheduling the CheckTask, but not following up with a
DirectoryTask to scan for new files. I use this function during the
Gtk.Application startup to automatically update the libraries.

Implements #31 (Automatically update the database during startup)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 15:50:11 -05:00
Anna Schumaker 295202443f scanner: Add a function for clearing the TaskQueue
And call this function when closing the player

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 15:50:08 -05:00
Anna Schumaker b245b2073e scanner: Rename tests
To remove a bunch of redundant names from the test functions

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 15:49:52 -05:00
Anna Schumaker 7b89f54e8b scanner: Remove the ImportTask
It is no longer needed now that we have updated from the tagdb

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 15:49:52 -05:00
Anna Schumaker b768d74928 sidebar: Don't change displayed playlist to None
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 15:49:52 -05:00
Anna Schumaker beca08b833 Implement the Gtk.Application instance in emmental.py
We don't use this during testing, so put it here instead of in a
submodule.

Implements #26 (Move the EmmentalApplication into emmental.py)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-28 15:49:50 -05:00
Anna Schumaker db2d122211 lib: Replace version.DEBUG with the __debug__ constant
Implements #27 (Check __debug__ constant instead of a .debug file)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-26 13:18:24 -05:00
Anna Schumaker 10c5fd4cef lib: set version.TESTING based on if the unittest module is loaded
This is cleaner than needing to set an environment variable before
running unit tests.

Implements #28 (Check if unittest is loaded to determine if we are testing)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-26 13:05:08 -05:00
Anna Schumaker 2daefa932c lib: Remove unused files
These are no longer needed now that tagdb has been removed

Implements #24 (Clean up lib/)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-26 10:18:09 -05:00
Anna Schumaker 915e3c8340 Remove unused tagdb module
Implements #23 (Remove tagdb/ code)
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2021-12-26 10:08:33 -05:00
57 changed files with 252 additions and 2591 deletions

View File

@ -37,4 +37,4 @@ pkgbuild:
.PHONY: tests
tests:
python tools/generate_tracks.py
EMMENTAL_TESTING=1 python -m unittest discover -v
python -m unittest discover -v

View File

@ -1,5 +1,4 @@
# Copyright 2021 (c) Anna Schumaker.
import tagdb
from gi.repository import Gtk
from . import artwork
from . import controls

View File

@ -72,11 +72,11 @@ class AutoPauseScale(ScalePlus):
return self.get_value() == 0
def format_value(self, scale, value):
value = int(value)
if value == -1: return "Keep Playing"
elif value == 0: return "This Track"
elif value == 1: return "Next Track"
return f"{value} Tracks"
match int(value):
case -1: return "Keep Playing"
case 0: return "This Track"
case 1: return "Next Track"
case _: return f"{int(value)} Tracks"
def decrement(self):
self.keep_playing = not self.about_to_pause()

View File

@ -1,13 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
import tagdb
from gi.repository import GObject
class Selector(GObject.GObject):
def __init__(self): GObject.GObject.__init__(self)
def next(self): return None
def previous(self): return None
class TagdbSelector(Selector):
def next(self): return tagdb.Stack.next()[0]
def previous(self): return tagdb.Stack.previous()

View File

@ -1,11 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
import unittest
from gi.repository import GObject
from . import selector
class TestAudioSelector(unittest.TestCase):
def test_audio_selector_init(self):
select = selector.Selector()
self.assertIsInstance(select, GObject.GObject)
self.assertIsNone(select.next())
self.assertIsNone(select.previous())

View File

@ -1,3 +1,3 @@
#!/bin/bash
python {EMMENTAL_LIB}/emmental.py $*
python -O {EMMENTAL_LIB}/emmental.py $*

View File

@ -60,7 +60,10 @@ class Playlist(GObject.GObject):
return cur.fetchone()[1] - 1
def track_adjusts_current(self, track):
return self.current > -1 and self.get_track_index(track) <= self.current
if self.current > -1:
if (index := self.get_track_index(track)) != None:
return index <= self.current
return False
def add_track(self, track):
self.emit("track-added", track)

View File

@ -140,17 +140,19 @@ class UserTable(playlist.Model):
sql.execute("DROP TABLE temp_playlist_map")
def do_factory(self, row):
if row["name"] == "Collection":
return Collection(row)
elif row["name"] == "Favorites":
return UserPlaylist(row, "emmental-favorites", "playlist_map")
elif row["name"] == "New Tracks":
return UserPlaylist(row, "starred", "temp_playlist_map")
elif row["name"] == "Previous":
return Previous(row)
elif row["name"] == "Queued Tracks":
return QueuedTracks(row)
return UserPlaylist(row, "audio-x-generic", "playlist_map")
match row["name"]:
case "Collection":
return Collection(row)
case "Favorites":
return UserPlaylist(row, "emmental-favorites", "playlist_map")
case "New Tracks":
return UserPlaylist(row, "starred", "temp_playlist_map")
case "Previous":
return Previous(row)
case "Queued Tracks":
return QueuedTracks(row)
case _:
return UserPlaylist(row, "audio-x-generic", "playlist_map")
def do_insert(self, plstate, name):
return sql.execute("INSERT INTO playlists (plstateid, name, sort) "

View File

@ -1,9 +1,32 @@
#!/usr/bin/python
# Copyright 2021 (c) Anna Schumaker.
import lib
import tagdb
lib.settings.load()
tagdb.load()
import db
import scanner
import ui
ui.Application.run()
from gi.repository import Gtk
class Application(Gtk.Application):
def __init__(self, *args, **kwargs):
app_id = f"org.gtk.emmental{'-debug' if __debug__ else ''}"
Gtk.Application.__init__(self, *args, application_id=app_id, **kwargs)
def do_startup(self):
Gtk.Application.do_startup(self)
self.add_window(ui.window.Window())
for i in range(db.library.Table.get_n_items()):
scanner.update_library(db.library.Table.get_item(i))
def do_activate(self):
for window in self.get_windows():
window.present()
def do_shutdown(self):
Gtk.Application.do_shutdown(self)
scanner.Queue.clear()
db.sql.optimize()
if __name__ == "__main__":
Application().run()

View File

@ -3,11 +3,7 @@ import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Gst", "1.0")
from . import bus
from . import data
from . import filter
from . import publisher
from . import settings
from . import tag
from . import tagstore
from . import thread
from . import version

View File

@ -1,59 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
import threading
from gi.repository import GLib
Start = publisher.Publisher()
RETRY = GLib.SOURCE_CONTINUE
class Bus:
def __init__(self, milliseconds):
self.timeout = milliseconds
self.timeout_id = None
self.passengers = [ ]
self.lock = threading.Lock()
def __do_board(self, func, *args):
with self.lock:
if (func, args) not in self.passengers:
self.passengers.append( (func, args) )
if self.timeout_id == None:
self.timeout_id = GLib.timeout_add(self.timeout, self.run)
return True
return False
def board(self, func, *args):
if self.__do_board(func, *args):
Start.publish(self)
def clear(self):
with self.lock:
if self.timeout_id:
GLib.source_remove(self.timeout_id)
self.timeout_id = None
self.passengers.clear()
def complete(self):
with self.lock:
for (func, args) in self.passengers:
func(*args)
GLib.source_remove(self.timeout_id)
self.timeout_id = None
self.passengers.clear()
def running(self):
with self.lock:
return self.timeout_id != None
def run(self):
with self.lock:
(func, args) = self.passengers[0]
if func(*args) == RETRY:
return GLib.SOURCE_CONTINUE
self.passengers.pop(0)
if len(self.passengers) == 0:
self.timeout_id = None
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE

View File

@ -1,22 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from gi.repository import Gtk
class Counter(Gtk.Adjustment):
def __init__(self, min, max):
Gtk.Adjustment.__init__(self)
self.configure(value=min, lower=min, upper=max + 1, step_increment=1,
page_increment=1, page_size=1)
def __change_value__(self, n):
value = self.get_value()
self.set_value(value + n)
if self.get_value() == value:
return None
return self.get_value()
def increment(self):
return self.__change_value__(1)
def decrement(self):
return self.__change_value__(-1)

View File

@ -7,7 +7,7 @@ import xdg.BaseDirectory
__resource = "emmental"
if version.TESTING == True:
__resource = "emmental-testing"
elif version.DEBUG == True:
elif __debug__ == True:
__resource = "emmental-debug"
READ = 'rb'

View File

@ -1,24 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from gi.repository import GObject
tracks = { }
class FakeTrack(GObject.GObject):
def __init__(self, n, tag=None):
GObject.GObject.__init__(self)
self.trackid = n
self.length = n
self.tag = tag
def __int__(self):
return self.trackid
def add_to_playlist(self, name):
self.tag.add_track(self)
def remove_from_playlist(self, name):
self.tag.remove_track(self)
def Track(n, tag=None):
return tracks.setdefault((n,tag), FakeTrack(n, tag=tag))

View File

@ -1,51 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
import mutagen
import re
class Metadata:
def __init__(self, filepath):
self.path = filepath
self.file = None
def __enter__(self):
self.file = mutagen.File(self.path)
return self
def __exit__(self, exp_type, exp_value, traceback):
self.file = None
return exp_type == None
def album(self):
return self.file.get("album", [ "Unknown Album" ])[0]
def artist(self):
artist = self.file.get("artist", [ "Unknown Artist" ])
return self.file.get("albumartist", artist)[0]
def artistsort(self):
sort = self.file.get("artistsort", [ None ])
return self.file.get("albumartistsort", sort)[0]
def decade(self):
return (self.year() // 10) * 10
def discnumber(self):
return int(self.file.get("discnumber", [ 1 ])[0])
def genres(self):
genre = self.file.get("genre", [ "" ])[0]
return [ g.strip() for g in re.split(",|;|/|:", genre) ]
def length(self):
return int(self.file.info.length)
def title(self):
return self.file.get("title", [ "" ])[0]
def tracknumber(self):
return int(self.file.get("tracknumber", [ 0 ])[0])
def year(self):
year = self.file.get("date", [ "0" ])
year = self.file.get("originalyear", year)[0]
return int(re.match("\d+", year).group(0))

View File

@ -1,19 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
class Publisher:
def __init__(self):
self.subscribers = set()
def publish(self, *args):
funcs = self.subscribers.copy()
for func in funcs:
func(*args)
def register(self, func):
self.subscribers.add(func)
def reset(self):
self.subscribers.clear()
def unregister(self, func):
self.subscribers.discard(func)

View File

@ -1,146 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
import random
import threading
class Tag:
def __init__(self, name, sort=None):
self.name = name
self.sort = sort.lower() if sort else str(name).lower()
self.current = -1
self.runtime = 0
self.loop = False
self.random = False
self.tracks = [ ]
self.widgets = None
self.lock = threading.Lock()
self.TrackAdded = publisher.Publisher()
self.TrackRemoved = publisher.Publisher()
def __getitem__(self, n):
with self.lock:
if n < len(self.tracks):
return self.tracks[n]
return None
def __getstate__(self):
with self.lock:
return { "name" : self.name,
"sort" : self.sort,
"current" : self.current,
"loop" : self.loop,
"random" : self.random,
"tracks" : [ t.trackid for t in self.tracks ] }
def __len__(self):
with self.lock:
return len(self.tracks)
def __lt__(self, rhs):
if not isinstance(rhs, SuperTag):
return self.sort < rhs.sort
if self == rhs.parent:
return True
return self.sort < rhs.parent.sort
def __next_track__(self):
if self.loop == True and self.current >= len(self.tracks):
return 0
return self.current + 1
def __random_track__(self):
i = 1
length = len(self.tracks)
if len(self.tracks) >= 3:
i = random.randint(1, length - 1)
return (self.current + i) % (1 if length == 0 else length)
def __setstate__(self, state):
self.name = state["name"]
self.sort = state["sort"]
self.current = state["current"]
self.loop = state["loop"]
self.random = state["random"]
self.tracks = state["tracks"]
self.runtime = 0
self.widgets = None
self.lock = threading.Lock()
self.TrackAdded = publisher.Publisher()
self.TrackRemoved = publisher.Publisher()
def __str__(self):
return self.name
def add_track(self, track):
with self.lock:
if track in self.tracks:
return
pos = len(self.tracks)
self.tracks.append(track)
self.runtime += track.length
self.TrackAdded.publish(self, track, pos)
def can_loop(self): return True
def can_random(self): return True
def get_header(self):
return self.sort[0].upper() if len(self.sort) > 0 else ""
def init_track(self, track):
with self.lock:
try:
i = self.tracks.index(track.trackid)
self.tracks[i] = track
self.runtime += track.length
except Exception as e:
pass
def next(self):
with self.lock:
if self.random == True:
self.current = self.__random_track__()
else:
self.current = self.__next_track__()
if self.current < len(self.tracks):
return self.tracks[self.current]
return None
def remove_track(self, track):
with self.lock:
pos = self.tracks.index(track)
self.tracks.remove(track)
self.runtime -= track.length
self.TrackRemoved.publish(self, track, pos)
def stacked(self):
with self.lock:
if self.current >= len(self.tracks):
self.current = -1
def track_selected(self, track):
with self.lock:
self.current = self.tracks.index(track)
track.add_to_playlist("Previous")
class SuperTag(Tag):
def __init__(self, parent, name, sort=None):
Tag.__init__(self, name, sort)
self.parent = parent
def __getstate__(self):
state = Tag.__getstate__(self)
state["parent"] = self.parent
return state
def __lt__(self, rhs):
if not isinstance(rhs, SuperTag):
return self.parent.sort < rhs.sort
if self.parent != rhs.parent:
return self.parent < rhs.parent
return self.sort < rhs.sort
def __setstate__(self, state):
Tag.__setstate__(self, state)
self.parent = state["parent"]

View File

@ -1,98 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
from . import tag
import threading
class TagStore:
def __init__(self):
self.store = dict()
self.lock = threading.Lock()
self.Added = publisher.Publisher()
self.Removed = publisher.Publisher()
def __add_tag__(self, name, sort, track):
t = self.__get_tag__(name, sort)
if track:
t.add_track(track)
return t
def __alloc_tag__(self, name, sort):
return tag.Tag(name, sort)
def __getitem__(self, name):
with self.lock:
return self.store.get(name)
def __getstate__(self):
with self.lock:
return { "store" : self.store }
def __get_tag__(self, name, sort):
with self.lock:
if (t := self.store.get(name)) != None:
return t
t = self.__alloc_tag__(name, sort)
self.store[name] = t
self.Added.publish(t)
return t
def __len__(self):
with self.lock:
return len(self.store)
def __pop_tag__(self, t):
self.store.pop(t.name)
def __setstate__(self, state):
self.store = state["store"]
self.lock = threading.Lock()
self.Added = publisher.Publisher()
self.Removed = publisher.Publisher()
def add(self, name, track=None, sort=None):
return self.__add_tag__(name.strip(), sort, track)
def init_track(self, name, track):
with self.lock:
if (t := self.store.get(name)) != None:
t.init_track(track)
return t
def remove(self, t, track=None):
if track:
t.remove_track(track)
if (track == None or len(t) == 0) and t in self.store.values():
with self.lock:
self.__pop_tag__(t)
self.Removed.publish(t)
def reset(self):
with self.lock:
self.store.clear()
self.Added.reset()
self.Removed.reset()
def tags(self):
with self.lock:
for (name, tag) in self.store.items():
yield tag
class TagSuperStore(TagStore):
def __alloc_tag__(self, key, sort):
return tag.SuperTag(key[0], key[1], sort)
def __pop_tag__(self, t):
self.store.pop((t.parent, t.name))
def add(self, parent, name, track, sort=None):
return super().__add_tag__((parent, name.strip()), sort, track)
def init_track(self, parent, name, track):
return super().init_track((parent, name), track)
def tags(self, parent=None):
with self.lock:
for (name, tag) in self.store.items():
if parent == None or tag.parent == parent:
yield tag

View File

@ -1,93 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import bus
from . import publisher
from gi.repository import GLib
import threading
import time
import unittest
main_context = GLib.main_context_default()
class TestBus(unittest.TestCase):
def setUp(self):
self.count = 0
self.start = 0
self.retry = False
bus.Start.register(self.cb_start)
def tearDown(self):
bus.Start.unregister(self.cb_start)
def cb_start(self, bus):
self.start += 1
def cb_one(self):
self.count += 1
def cb_two(self, arg):
self.count = arg
def cb_retry(self, arg):
self.count = arg
return bus.RETRY if self.retry == True else None
def test_bus_init(self):
self.assertEqual(bus.RETRY, GLib.SOURCE_CONTINUE)
b = bus.Bus(100)
self.assertEqual(b.timeout, 100)
self.assertEqual(b.passengers, [ ])
self.assertIsNone(b.timeout_id)
self.assertIsInstance(b.lock, type(threading.Lock()))
self.assertIsInstance(bus.Start, publisher.Publisher)
def test_bus_board(self):
b = bus.Bus(100)
b.board(self.cb_one)
b.board(self.cb_one)
self.assertEqual(b.passengers, [ (self.cb_one,()) ])
self.assertIsNotNone(b.timeout_id)
self.assertTrue(b.running())
self.assertEqual(self.start, 1)
time.sleep(0.1)
while main_context.iteration(may_block=False): pass
self.assertEqual(self.count, 1)
self.assertIsNone(b.timeout_id)
self.assertFalse(b.running())
def test_bus_clear(self):
b = bus.Bus(100)
b.clear()
for i in range(100):
b.board(self.cb_one)
b.clear()
self.assertEqual(b.passengers, [ ])
self.assertIsNone(b.timeout_id)
def test_bus_complete(self):
b = bus.Bus(100)
for i in range(100):
b.board(self.cb_two, i)
b.complete()
self.assertEqual(b.passengers, [ ])
self.assertEqual(self.count, i)
self.assertIsNone(b.timeout_id)
def test_bus_retry(self):
b = bus.Bus(10)
b.board(self.cb_retry, 1)
b.board(self.cb_retry, 2)
self.retry = True
b.run()
self.assertEqual(b.passengers, [ (self.cb_retry, (1,)), (self.cb_retry, (2,)) ])
b.complete()
self.assertEqual(b.passengers, [ ])

View File

@ -1,20 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import counter
from gi.repository import Gtk
import unittest
class TestCounter(unittest.TestCase):
def test_counter(self):
c = counter.Counter(1, 10)
self.assertIsInstance(c, Gtk.Adjustment)
self.assertEqual(c.get_lower(), 1)
self.assertEqual(c.get_upper(), 11)
self.assertEqual(c.get_value(), 1)
for i in [ 2, 3, 4, 5, 6, 7, 8, 9, 10, None ]:
self.assertEqual(c.increment(), i)
for i in [ 9, 8, 7, 6, 5, 4, 3, 2, 1, None ]:
self.assertEqual(c.decrement(), i)

View File

@ -1,34 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import metadata
import pathlib
import unittest
test_tracks = pathlib.Path("./data/Test Album")
track_01 = test_tracks / "01 - Test Track.ogg"
track_02 = test_tracks / "02 - Test {Disc 2}.ogg"
class TestMetadata(unittest.TestCase):
def test_metadata_init(self):
mdf = metadata.Metadata(track_01)
self.assertEqual(mdf.path, track_01)
self.assertIsNone(mdf.file)
def test_metadata_track_01(self):
with metadata.Metadata(track_01) as mdf:
self.assertEqual(mdf.album(), "Test Album")
self.assertEqual(mdf.artist(), "Test Artist")
self.assertEqual(mdf.artistsort(), "Artist, Test")
self.assertEqual(mdf.decade(), 2010)
self.assertEqual(mdf.discnumber(), 1)
self.assertEqual(mdf.genres(), [ "Test" ])
self.assertEqual(mdf.length(), 10)
self.assertEqual(mdf.title(), "Test Track")
self.assertEqual(mdf.tracknumber(), 1)
self.assertEqual(mdf.year(), 2019)
def test_metadata_track_02(self):
with metadata.Metadata(track_02) as mdf:
self.assertEqual(mdf.artist(), "Test Album Artist")
self.assertEqual(mdf.artistsort(), "Album Artist, Test")
self.assertEqual(mdf.genres(), [ "Test", "Genre", "List" ])
self.assertEqual(mdf.year(), 2019)

View File

@ -1,30 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
from . import publisher
import unittest
class TestPublisher(unittest.TestCase):
def on_test(self, text):
self.test_arg = text
def test_publisher_init(self):
pub = publisher.Publisher()
self.assertIsInstance(pub.subscribers, set)
self.assertEqual(pub.subscribers, set())
def test_publisher_register(self):
pub = publisher.Publisher()
pub.register(self.on_test)
self.assertEqual(pub.subscribers, { self.on_test })
pub.unregister(self.on_test)
self.assertEqual(pub.subscribers, set())
pub.subscribers = set([ 1, 2, 3 ])
pub.reset()
self.assertEqual(pub.subscribers, set())
def test_publisher_publish(self):
pub = publisher.Publisher()
pub.register(self.on_test)
pub.publish("Test Arg")
self.assertEqual(self.test_arg, "Test Arg")

View File

@ -1,256 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
from . import tag
from . import fake
import random
import threading
import unittest
class TestTag(unittest.TestCase):
def setUp(self):
self.changed = None
def callback_func(self, tag, track, pos):
self.changed = (tag, track, pos)
def test_tag_init(self):
t = tag.Tag("test")
self.assertEqual(t.name, "test")
self.assertEqual(t.sort, "test")
self.assertEqual(t.current, -1)
self.assertEqual(t.runtime, 0)
self.assertEqual(t.tracks, [ ])
self.assertFalse(t.loop)
self.assertFalse(t.random)
self.assertTrue( t.can_loop())
self.assertTrue( t.can_random())
self.assertIsNone(t.widgets)
self.assertIsInstance(t.lock, type(threading.Lock()))
self.assertIsInstance(t.TrackAdded, publisher.Publisher)
self.assertIsInstance(t.TrackRemoved, publisher.Publisher)
self.assertEqual(str(t), "test")
def test_tag_len(self):
t = tag.Tag("Test")
self.assertEqual(len(t), 0)
t.tracks = [ fake.Track(i) for i in range(5) ]
self.assertEqual(len(t), 5)
def test_tag_lt(self):
a = tag.Tag("A")
b = tag.Tag("B")
c = tag.Tag("C")
self.assertTrue( a < b)
self.assertFalse(b < a)
self.assertFalse(a < a)
a2 = tag.SuperTag(b, "A")
self.assertTrue(a < a2)
self.assertTrue(b < a2)
self.assertTrue(a2 < c)
def test_tag_header(self):
self.assertEqual(tag.Tag("Test").get_header(), "T")
self.assertEqual(tag.Tag("Test", "sort").get_header(), "S")
self.assertEqual(tag.Tag("").get_header(), "")
def test_tag_state(self):
t = tag.Tag("test")
tracks = [ fake.Track(i) for i in range(5) ]
t.tracks = tracks
state = t.__getstate__()
self.assertEqual(set(state.keys()),
set([ "name", "sort", "current", "loop", "random", "tracks" ]))
self.assertEqual(state["name"], "test")
self.assertEqual(state["sort"], "test")
self.assertEqual(state["current"], -1)
self.assertEqual(state["tracks"], [ 0, 1, 2, 3, 4 ])
self.assertFalse(state["loop"])
self.assertFalse(state["random"])
state["sort"] = "sort"
state["current"] = 42
t.__dict__.clear()
t.__setstate__(state)
self.assertEqual(t.name, "test")
self.assertEqual(t.sort, "sort")
self.assertEqual(t.current, 42)
self.assertEqual(t.runtime, 0)
self.assertEqual(t.tracks, [ 0, 1, 2, 3, 4 ])
self.assertEqual(t.widgets, None)
self.assertFalse(t.loop)
self.assertFalse(t.random)
self.assertIsInstance(t.lock, type(threading.Lock()))
self.assertIsInstance(t.TrackAdded, publisher.Publisher)
self.assertIsInstance(t.TrackRemoved, publisher.Publisher)
for track in tracks:
t.init_track(track)
self.assertEqual(t.tracks, tracks)
self.assertEqual(t.runtime, 10)
def test_tag_add_track(self):
t = tag.Tag("test")
t.TrackAdded.register(self.callback_func)
self.assertIsNone(t[0])
t.add_track(fake.Track(1))
self.assertEqual(t[0], fake.Track(1))
self.assertEqual(t.tracks, [ fake.Track(1) ])
self.assertEqual(self.changed, (t, fake.Track(1), 0))
self.assertEqual(t.runtime, 1)
t.add_track(fake.Track(2))
self.assertEqual(t[1], fake.Track(2))
self.assertEqual(t.tracks, [ fake.Track(1), fake.Track(2) ])
self.assertEqual(self.changed, (t, fake.Track(2), 1))
self.assertEqual(t.runtime, 3)
t.add_track(fake.Track(1))
self.assertEqual(t.tracks, [ fake.Track(1), fake.Track(2) ])
self.assertEqual(self.changed, (t, fake.Track(2), 1))
self.assertEqual(t.runtime, 3)
def test_tag_remove_track(self):
t = tag.Tag("test")
t.add_track(fake.Track(1))
t.add_track(fake.Track(2))
t.TrackRemoved.register(self.callback_func)
t.remove_track(fake.Track(1))
self.assertEqual(t.tracks, [ fake.Track(2) ])
self.assertEqual(self.changed, (t, fake.Track(1), 0))
self.assertEqual(t.runtime, 2)
t.remove_track(fake.Track(2))
self.assertEqual(t.tracks, [ ])
self.assertEqual(self.changed, (t, fake.Track(2), 0))
self.assertEqual(t.runtime, 0)
def test_tag_next(self):
t = tag.Tag("test")
t.tracks = [ 1, 2, 3 ]
self.assertEqual(t.next(), 1)
self.assertEqual(t.current, 0)
self.assertEqual(t.next(), 2)
self.assertEqual(t.current, 1)
self.assertEqual(t.next(), 3)
self.assertEqual(t.current, 2)
self.assertIsNone(t.next())
self.assertEqual(t.current, 3)
t.loop = True
self.assertEqual(t.next(), 1)
self.assertEqual(t.current, 0)
def test_tag_random_next(self):
t = tag.Tag("test")
t.tracks = [ 0, 1, 2, 3, 4, 5 ]
t.random = True
# Expected randint(): 5, 3, 2, 5, 5, 5
random.seed(20210318)
self.assertEqual(t.next(), 4) # -1 + 5
self.assertEqual(t.next(), 1) # (4 + 3) % 6 = 7 % 6
self.assertEqual(t.next(), 3) # 1 + 2
self.assertEqual(t.next(), 2) # (3 + 5) % 6 = 8 % 6
self.assertEqual(t.next(), 1) # (2 + 5) % 6 = 7 % 6
self.assertEqual(t.next(), 0) # (1 + 5) % 6 = 0
t.tracks = [ ]
self.assertIsNone(t.next())
t.tracks = [ 0 ]
self.assertEqual(t.next(), 0)
self.assertEqual(t.next(), 0)
t.tracks = [ 0, 1 ]
t.current = -1
self.assertEqual(t.next(), 0)
self.assertEqual(t.next(), 1)
self.assertEqual(t.next(), 0)
self.assertEqual(t.next(), 1)
def test_tag_track_selected(self):
t = tag.Tag("test")
p = tag.Tag("Previous")
t.tracks = [ fake.Track(0, p), fake.Track(1, p), fake.Track(2, p) ]
t.track_selected(fake.Track(2, p))
self.assertEqual(t.current, 2)
self.assertIn(fake.Track(2, p), p.tracks)
t.track_selected(fake.Track(1, p))
self.assertEqual(t.current, 1)
self.assertIn(fake.Track(1, p), p.tracks)
t.track_selected(fake.Track(0, p))
self.assertEqual(t.current, 0)
self.assertIn(fake.Track(0, p), p.tracks)
def test_tag_stacked(self):
t = tag.Tag("test")
t.tracks = [ 0, 1, 2 ]
t.current = 3
t.stacked()
self.assertEqual(t.current, -1)
t.current = 1
t.stacked()
self.assertEqual(t.current, 1)
class TestSuperTag(unittest.TestCase):
def test_super_tag(self):
parent = tag.Tag("parent")
st = tag.SuperTag(parent, "test", "sort")
st.tracks = [ fake.Track(i) for i in range(5) ]
self.assertIsInstance(st, tag.Tag)
self.assertEqual(st.parent, parent)
state = st.__getstate__()
self.assertEqual(state["name"], "test")
self.assertEqual(state["sort"], "sort")
self.assertEqual(state["current"], -1)
self.assertEqual(state["tracks"], [ 0, 1, 2, 3, 4 ])
self.assertEqual(state["parent"], parent)
st.__dict__.clear()
st.__setstate__(state)
self.assertEqual(st.name, "test")
self.assertEqual(st.sort, "sort")
self.assertEqual(st.tracks, [ 0, 1, 2, 3, 4 ])
self.assertEqual(st.widgets, None)
self.assertEqual(st.parent, parent)
self.assertIsInstance(st.lock, type(threading.Lock()))
def test_super_tag_lt(self):
A = tag.Tag("A")
B = tag.Tag("B")
C = tag.Tag("C")
aa = tag.SuperTag(A, "A")
ba = tag.SuperTag(B, "A")
bb = tag.SuperTag(B, "B")
ca = tag.SuperTag(C, "A")
lst = [ A, aa, B, ba, bb, C, ca ]
for i, t in enumerate(lst):
for u in lst[i+1:]:
self.assertTrue(t < u)
lst = [ ca, C, bb, ba, B, aa, A ]
for i, t in enumerate(lst):
for u in lst[i+1:]:
self.assertFalse(t < u)

View File

@ -1,167 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import fake
from . import publisher
from . import tag
from . import tagstore
import threading
import unittest
class TestTagStore(unittest.TestCase):
def setUp(self):
self.added = None
self.removed = None
def on_store_added(self, tag):
self.added = tag
def on_store_removed(self, tag):
self.removed = tag
def test_tag_store(self):
store = tagstore.TagStore()
self.assertEqual(store.store, { })
self.assertIsInstance(store.lock, type(threading.Lock()))
self.assertIsInstance(store.Added, publisher.Publisher)
self.assertIsInstance(store.Removed, publisher.Publisher)
def test_tag_store_add_remove(self):
store = tagstore.TagStore()
store.Added.register(self.on_store_added)
store.Removed.register(self.on_store_removed)
tag = store.add("test", fake.Track(1))
self.assertEqual(self.added, tag)
self.assertEqual(tag.tracks, [ fake.Track(1) ])
self.added = None
self.assertEqual(store.add("test ", fake.Track(2)), tag)
self.assertEqual(tag.tracks, [ fake.Track(1), fake.Track(2) ])
self.assertIsNone(self.added)
store.remove(tag, fake.Track(1))
self.assertEqual(tag.tracks, [ fake.Track(2) ])
self.assertIn("test", store.store.keys())
self.assertIsNone(self.removed)
store.remove(tag, fake.Track(2))
self.assertEqual(tag.tracks, [ ])
self.assertNotIn("test", store.store.keys())
self.assertEqual(self.removed, tag)
def test_tag_store_add_remove_none(self):
store = tagstore.TagStore()
tag = store.add("test")
self.assertEqual(tag.tracks, [ ])
tag.add_track(fake.Track(1))
tag.add_track(fake.Track(2))
store.remove(tag)
self.assertNotIn("test", store.store.keys())
def test_tag_store_add_remove_sort(self):
store = tagstore.TagStore()
tag = store.add("test", sort="sort")
self.assertEqual(tag.sort, "sort")
def test_tag_store_items(self):
store = tagstore.TagStore()
self.assertEqual(len(store), 0)
tag1 = store.add("test1", fake.Track(1))
tag2 = store.add("test2", fake.Track(2))
tag3 = store.add("test3", fake.Track(3))
self.assertEqual(len(store), 3)
self.assertEqual( store["test1"], tag1)
self.assertEqual( store["test2"], tag2)
self.assertEqual( store["test3"], tag3)
self.assertIsNone(store["test4"])
result = [ tag for tag in store.tags() ]
self.assertEqual(len(result), 3)
self.assertIn(tag1, result)
self.assertIn(tag2, result)
self.assertIn(tag3, result)
def test_tag_store_reset(self):
store = tagstore.TagStore()
tag = store.add("test", fake.Track(1))
store.Added.register(self.on_store_added)
store.Removed.register(self.on_store_removed)
store.reset()
self.assertNotIn(tag.name, store.store.keys())
self.assertEqual(store.Added.subscribers, set())
self.assertEqual(store.Removed.subscribers, set())
def test_tag_store_state(self):
store = tagstore.TagStore()
track = fake.Track(1)
tag = store.add("test", track)
state = store.__getstate__()
self.assertEqual(set(state.keys()), set([ "store" ]))
store.__dict__.clear()
store.__setstate__(state)
self.assertEqual(store.store, { "test" : tag })
self.assertIsInstance(store.lock, type(threading.Lock()))
self.assertIsInstance(store.Added, publisher.Publisher)
self.assertIsInstance(store.Removed, publisher.Publisher)
self.assertEqual(store.init_track("test", track), tag)
self.assertEqual(tag.tracks, [ track ])
class TestTagSuperStore(unittest.TestCase):
def test_tag_superstore(self):
store = tagstore.TagStore()
superstore = tagstore.TagSuperStore()
self.assertIsInstance(superstore, tagstore.TagStore)
parent = store.add("parent", fake.Track(1))
supertag = superstore.add(parent, "test ", fake.Track(1))
self.assertIsInstance(supertag, tag.SuperTag)
self.assertEqual(supertag.name, "test")
self.assertEqual(supertag.tracks, [ fake.Track(1) ])
self.assertEqual(supertag.parent, parent)
superstore.remove(supertag, fake.Track(1))
self.assertEqual(supertag.tracks, [ ])
self.assertNotIn("test", superstore.store.keys())
def test_tag_superstore_items(self):
store = tagstore.TagStore()
superstore = tagstore.TagSuperStore()
parent1 = store.add("parent1", fake.Track(1))
parent2 = store.add("parent2", fake.Track(2))
tag1 = superstore.add(parent1, "test1", fake.Track(1))
tag2 = superstore.add(parent1, "test2", fake.Track(2))
tag3 = superstore.add(parent2, "test3", fake.Track(3))
self.assertEqual(len(superstore), 3)
self.assertEqual( superstore[parent1, "test1"], tag1)
self.assertEqual( superstore[parent1, "test2"], tag2)
self.assertEqual( superstore[parent2, "test3"], tag3)
self.assertIsNone(superstore[parent2, "test2"])
result = [ tag for tag in superstore.tags(parent1) ]
self.assertEqual(result, [ tag1, tag2 ])
result = [ tag for tag in superstore.tags(parent2) ]
self.assertEqual(result, [ tag3 ])
result = [ tag for tag in superstore.tags() ]
self.assertEqual(result, [ tag1, tag2, tag3 ])
def test_tag_superstore_init_track(self):
store = tagstore.TagStore()
superstore = tagstore.TagSuperStore()
track = fake.Track(1)
parent = store.add("test")
tag = superstore.add(parent, "test", track)
tag.tracks = [ 1 ]
self.assertEqual(superstore.init_track(parent, "test", track), tag)
self.assertEqual(tag.tracks, [ track ])

View File

@ -1,35 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
from . import thread
import threading
import unittest
class TestThread(unittest.TestCase):
def setUp(self):
self.started = None
self.called = False
self.thread = thread.Thread(self.thread_func)
def thread_func(self):
self.assertIsNotNone(self.thread.thread)
self.called = True
def on_thread_start(self, thread):
self.started = thread
def test_thread(self):
self.assertIsInstance(thread.Start, publisher.Publisher)
thread.Start.register(self.on_thread_start)
self.assertIsInstance(self.thread.lock, type(threading.Lock()))
self.assertEqual(self.thread.func, self.thread_func)
self.assertIsNone(self.thread.thread)
self.assertFalse(self.thread.running())
self.assertEqual(self.thread(), self.thread)
self.thread.join()
self.assertTrue(self.called)
self.assertFalse(self.thread.running())
self.assertEqual(self.started, self.thread)
self.assertIsNone(self.thread.thread)

View File

@ -5,9 +5,9 @@ import unittest
class TestVersion(unittest.TestCase):
def test_version(self):
self.assertEqual(version.MAJOR, 2)
self.assertEqual(version.MINOR, 9)
self.assertEqual(version.MINOR, 10)
self.assertTrue(version.DEBUG)
self.assertTrue(__debug__)
self.assertTrue(version.TESTING)
self.assertEqual(version.string(), "Emmental 2.9-debug")
self.assertEqual(version.string(), "Emmental 2.10-debug")

View File

@ -1,35 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import publisher
import threading
Start = publisher.Publisher()
class Thread:
def __init__(self, func):
self.func = func
self.thread = None
self.lock = threading.Lock()
def __call__(self):
with self.lock:
if self.thread:
return None
self.thread = threading.Thread(target = self.__func__)
self.thread.start()
Start.publish(self)
return self
def __func__(self):
self.func()
with self.lock:
self.thread = None
def join(self):
if self.thread:
self.thread.join()
def running(self):
with self.lock:
if self.thread:
return self.thread.is_alive()
return False

View File

@ -1,17 +1,10 @@
# Copyright 2021 (c) Anna Schumaker.
import os
import sys
MAJOR = 2
MINOR = 9
DEBUG = False
if os.path.exists(".debug"):
with open(".debug") as f:
if f.read().strip() == "emmental":
DEBUG = True
TESTING = os.environ.get("EMMENTAL_TESTING") != None
MINOR = 10
TESTING = "unittest" in sys.modules
def string():
return f"Emmental {MAJOR}.{MINOR}{'-debug' if DEBUG else ''}"
return f"Emmental {MAJOR}.{MINOR}{'-debug' if __debug__ else ''}"

View File

@ -29,27 +29,23 @@ class Panel(Gtk.Box):
def get_playlist(self): return self.window.get_playlist()
def set_playlist(self, plist):
self.header.set_playlist(plist)
self.window.set_playlist(plist)
if plist:
self.header.set_playlist(plist)
self.window.set_playlist(plist)
def key_pressed(self, event, keyval, keycode, state):
name = Gdk.keyval_name(keyval)
if name == "Escape":
self.window.clear_selection()
return True
elif name == "Delete":
playlist = self.get_playlist()
if playlist and playlist.can_add_remove_tracks():
match Gdk.keyval_name(keyval):
case "Escape": self.window.clear_selection()
case "Delete":
playlist = self.get_playlist()
if not (playlist and playlist.can_add_remove_tracks()):
return False
for track in self.selected_tracks():
playlist.remove_track(track)
return True
elif name == "f":
self.add_selected_tracks(db.user.Table.find("Favorites"))
return True
elif name == "q":
self.add_selected_tracks(db.user.Table.find("Queued Tracks"))
return True
return False
case "f": self.add_selected_tracks(db.user.Table.find("Favorites"))
case "q": self.add_selected_tracks(db.user.Table.find("Queued Tracks"))
case _: return False
return True
def jump_clicked(self, button):
view = self.window.get_child()

View File

@ -33,13 +33,17 @@ class LabelFactory(Gtk.SignalListItemFactory):
def get_track_text(self, track):
raise NotImplementedError
def get_track_dim(self, track):
return False
def on_setup(self, factory, listitem):
listitem.set_child(TrackLabel(xalign=self.xalign))
def on_bind(self, factory, listitem):
item = listitem.get_item()
text = self.get_track_text(item)
listitem.get_child().set_item(item, text)
if child := listitem.get_child():
child.set_item(item, self.get_track_text(item))
child.set_sensitive(not self.get_track_dim(item))
def on_unbind(self, factory, listitem):
listitem.get_child().unset_item(listitem.get_item())
@ -77,7 +81,10 @@ class AlbumFactory(LabelFactory):
class SubtitleFactory(LabelFactory):
def __init__(self): LabelFactory.__init__(self, xalign=0)
def get_track_text(self, track): return track.disc.subtitle
def get_track_dim(self, track): return len(track.disc.subtitle) == 0
def get_track_text(self, track):
subtitle = track.disc.subtitle
return subtitle if len(subtitle) > 0 else track.disc.name
class YearFactory(LabelFactory):

View File

@ -1,4 +1,5 @@
# Copyright 2021 (c) Anna Schumaker.
import audio
import db
import lib
from gi.repository import Gtk
@ -62,6 +63,29 @@ class SortButton(Gtk.MenuButton):
self.set_sensitive(plist != db.user.Table.find("Previous"))
class FavoriteButton(Gtk.ToggleButton):
def __init__(self):
Gtk.ToggleButton.__init__(self)
self.set_icon_name("emmental-favorites")
self.track_changed(audio.Player, None, audio.Player.track)
audio.Player.connect("track-changed", self.track_changed)
def set_playlist(self, plist):
pass
def track_changed(self, player, old, new):
self.set_sensitive(new != None)
self.set_active(new in db.user.Table.find("Favorites").get_tracks())
def do_toggled(self):
if audio.Player.track:
fav = db.user.Table.find("Favorites")
if self.get_active():
fav.add_track(audio.Player.track)
else:
fav.remove_track(audio.Player.track)
class JumpButton(Gtk.Button):
def __init__(self):
Gtk.Button.__init__(self)
@ -74,20 +98,9 @@ class JumpButton(Gtk.Button):
class ControlBox(Gtk.Box):
def __init__(self):
Gtk.Box.__init__(self)
Gtk.Box.__init__(self, margin_top=5, margin_bottom=5,
margin_end=5, margin_start=5)
self.add_css_class("linked")
self.append(RandomToggle())
self.append(LoopToggle())
self.append(SortButton())
self.append(JumpButton())
self.set_margin_top(5)
self.set_margin_bottom(5)
self.set_margin_start(5)
self.set_margin_end(5)
def get_jump_button(self):
return self.get_last_child()
def set_playlist(self, plist):
child = self.get_first_child()
@ -96,14 +109,34 @@ class ControlBox(Gtk.Box):
child = child.get_next_sibling()
class TrackBox(ControlBox):
def __init__(self):
ControlBox.__init__(self)
self.append(FavoriteButton())
self.append(JumpButton())
def get_jump_button(self):
return self.get_last_child()
class PlaylistBox(ControlBox):
def __init__(self):
ControlBox.__init__(self)
self.append(RandomToggle())
self.append(LoopToggle())
self.append(SortButton())
class Header(Gtk.Box):
def __init__(self):
Gtk.Box.__init__(self)
self.append(TrackBox())
self.append(FilterEntry())
self.append(ControlBox())
self.append(PlaylistBox())
def get_jump_button(self):
return self.get_last_child().get_jump_button()
return self.get_first_child().get_jump_button()
def set_playlist(self, plist):
self.get_first_child().set_playlist(plist)
self.get_last_child().set_playlist(plist)

View File

@ -100,9 +100,11 @@ class TestColumnFactories(unittest.TestCase):
def test_subtitle(self):
factory = column.SubtitleFactory()
self.assertIsInstance(factory, column.LabelFactory)
self.assertEqual(factory.get_track_text(self.track), "")
self.assertEqual(factory.get_track_text(self.track), "Disc 1")
self.assertTrue(factory.get_track_dim(self.track))
self.track.disc._subtitle = "Test Subtitle"
self.assertEqual(factory.get_track_text(self.track), "Test Subtitle")
self.assertFalse(factory.get_track_dim(self.track))
def test_year(self):
factory = column.YearFactory()

View File

@ -1,4 +1,5 @@
# Copyright 2021 (c) Anna Schumaker.
import audio
import db
import lib
import unittest
@ -113,6 +114,52 @@ class TestSortButton(unittest.TestCase):
self.assertTrue(sort.get_sensitive())
class TestFavoriteButton(unittest.TestCase):
def setUp(self):
db.reset()
audio.Player.track = None
def test_init(self):
fav = header.FavoriteButton()
self.assertIsInstance(fav, Gtk.ToggleButton)
self.assertEqual(fav.get_icon_name(), "emmental-favorites")
self.assertFalse(fav.get_sensitive())
def test_sensitive(self):
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
fav = header.FavoriteButton()
fav.track_changed(audio.Player, None, track)
self.assertTrue(fav.get_sensitive())
fav.track_changed(audio.Player, None, None)
self.assertFalse(fav.get_sensitive())
def test_active(self):
fav = header.FavoriteButton()
t1 = db.make_fake_track(1, 1, "Test Track 1", "/a/b/c/1.ogg")
t2 = db.make_fake_track(2, 2, "Test Track 2", "/a/b/c/2.ogg")
db.user.Table.find("Favorites").add_track(t1)
fav.track_changed(audio.Player, None, t1)
self.assertTrue(fav.get_active())
fav.track_changed(audio.Player, t1, t2)
self.assertFalse(fav.get_active())
def test_toggle(self):
track = db.make_fake_track(1, 1, "Test Track", "/a/b/c/1.ogg")
fav = header.FavoriteButton()
fav.set_active(True)
self.assertEqual(db.user.Table.find("Favorites").get_n_tracks(), 0)
fav.set_active(False)
audio.Player.track = track
self.assertNotIn(track, db.user.Table.find("Favorites").get_tracks())
fav.set_active(True)
self.assertIn(track, db.user.Table.find("Favorites").get_tracks())
fav.set_active(False)
self.assertNotIn(track, db.user.Table.find("Favorites").get_tracks())
class TestJumpButton(unittest.TestCase):
def test_init(self):
jump = header.JumpButton()
@ -138,9 +185,31 @@ class TestControlBox(unittest.TestCase):
self.assertEqual(box.get_margin_end(), 5)
self.assertTrue(box.has_css_class("linked"))
class TestTrackBox(unittest.TestCase):
def test_init(self):
box = header.TrackBox()
self.assertIsInstance(box, header.ControlBox)
def test_children(self):
box = header.TrackBox()
child = box.get_first_child()
self.assertIsInstance(child, header.FavoriteButton)
child = child.get_next_sibling()
self.assertIsInstance(child, header.JumpButton)
self.assertEqual(box.get_jump_button(), child)
class TestPlaylistBox(unittest.TestCase):
def test_init(self):
box = header.PlaylistBox()
self.assertIsInstance(box, header.ControlBox)
def test_children(self):
collection = db.user.Table.find("Collection")
box = header.ControlBox()
box = header.PlaylistBox()
box.set_playlist(collection)
child = box.get_first_child()
@ -154,10 +223,6 @@ class TestControlBox(unittest.TestCase):
child = child.get_next_sibling()
self.assertIsInstance(child, header.SortButton)
child = child.get_next_sibling()
self.assertIsInstance(child, header.JumpButton)
self.assertEqual(box.get_jump_button(), child)
class TestHeader(unittest.TestCase):
def test_init(self):
@ -172,6 +237,9 @@ class TestHeader(unittest.TestCase):
box.set_playlist(collection)
child = box.get_first_child()
self.assertIsInstance(child, header.TrackBox)
child = child.get_next_sibling()
self.assertIsInstance(child, header.FilterEntry)
child = child.get_next_sibling()

View File

@ -29,11 +29,9 @@ def EnableSwitch(library):
def commit():
Queue.push(task.CommitTask())
def import_track(lib, track, playcount, lastplayed, playlists):
Queue.push(task.ImportTask(lib, track, playcount, lastplayed, playlists))
def update_library(lib):
Queue.push(task.CheckSchedulerTask(lib))
Queue.push(task.DirectoryTask(lib, lib.path))
def remove_library(lib):
Queue.push(task.RemoveLibrarySchedulerTask(lib))

View File

@ -17,6 +17,9 @@ class TaskQueue(GObject.GObject):
def push(self, task):
self.emit("task-pushed", task)
def clear(self):
self.emit("tasks-finished")
def run(self):
self.emit("run-task", self.tasks.pop(0))
if len(self.tasks) > 0:

View File

@ -38,32 +38,6 @@ class FileTask(Task):
db.genre.Table.find(genre).add_track(track)
class ImportTask(FileTask):
def __init__(self, library, filepath, playcount, lastplayed, playlists):
FileTask.__init__(self, library, filepath)
self.playcount = playcount
if isinstance(lastplayed, datetime.datetime):
self.lastplayed = lastplayed
elif isinstance(lastplayed, datetime.date):
self.lastplayed = datetime.datetime.combine(lastplayed,
datetime.time())
exclude = set([ "Collection", "New Tracks", "Previous" ])
self.playlists = [ p for p in playlists if p not in exclude ]
if "Up Next" in self.playlists:
self.playlists[self.playlists.index("Up Next")] = "Queued Tracks"
def run_task(self):
FileTask.run_task(self)
if track := db.track.Table.lookup(self.filepath):
db.user.Table.find("New Tracks").remove_track(track)
for plist in self.playlists:
db.user.Table.find(plist).add_track(track)
if self.playcount > 0:
track.playcount = self.playcount
track.lastplayed = self.lastplayed
class DirectoryTask(Task):
def __init__(self, library, dirpath):
Task.__init__(self)

View File

@ -10,12 +10,12 @@ track_02 = test_tracks / "02 - Test {Disc 2}.ogg"
text_txt = test_tracks / "text.txt"
class TestMetadata(unittest.TestCase):
def test_metadata_init(self):
def test_init(self):
mdf = scanner.metadata.Metadata(track_01)
self.assertEqual(mdf.path, track_01)
self.assertIsNone(mdf.file)
def test_metadata_track_01(self):
def test_track_01(self):
with scanner.metadata.Metadata(track_01) as mdf:
self.assertEqual(mdf.album(), "Test Album")
self.assertEqual(mdf.artist(), "Test Artist")
@ -30,7 +30,7 @@ class TestMetadata(unittest.TestCase):
self.assertEqual(mdf.tracknumber(), 1)
self.assertEqual(mdf.year(), 2019)
def test_metadata_track_02(self):
def test_track_02(self):
with scanner.metadata.Metadata(track_02) as mdf:
self.assertEqual(mdf.artist(), "Test Album Artist")
self.assertEqual(mdf.artistsort(), "Album Artist, Test")
@ -39,6 +39,6 @@ class TestMetadata(unittest.TestCase):
self.assertEqual(mdf.release(), datetime.date(2019, 1, 1))
self.assertEqual(mdf.year(), 2019)
def test_metadata_text_txt(self):
def test_text_txt(self):
with scanner.metadata.Metadata(text_txt) as mdf:
mdf.artist()

View File

@ -10,7 +10,7 @@ class FakeTask(task.Task):
self.res = res
def run_task(self): return self.res
class TestScannerTaskQueue(unittest.TestCase):
class TestTaskQueue(unittest.TestCase):
def setUp(self):
self.idle_start = None
@ -23,7 +23,7 @@ class TestScannerTaskQueue(unittest.TestCase):
def on_tasks_finished(self, queue):
self.tasks_finished = True
def test_scanner_queue_init(self):
def test_init(self):
q = queue.TaskQueue()
self.assertIsInstance(q, GObject.GObject)
@ -32,7 +32,7 @@ class TestScannerTaskQueue(unittest.TestCase):
self.assertEqual(q.idleid, None)
self.assertEqual(q.progress, 0)
def test_scanner_queue_push(self):
def test_push(self):
q = queue.TaskQueue()
fake = FakeTask()
q.connect("task-pushed", self.on_task_pushed)
@ -42,7 +42,17 @@ class TestScannerTaskQueue(unittest.TestCase):
self.assertIsNotNone(q.idleid)
self.assertEqual(self.pushed_task, fake)
def test_scanner_queue_run(self):
def test_clear(self):
q = queue.TaskQueue()
q.connect("tasks-finished", self.on_tasks_finished)
q.push(FakeTask())
q.clear()
self.assertEqual(q.tasks, [ ])
self.assertTrue(self.tasks_finished)
self.assertIsNone(q.idleid)
def test_run(self):
q = queue.TaskQueue()
fake3 = FakeTask()
fake2 = FakeTask()

View File

@ -12,31 +12,28 @@ class TestScanner(unittest.TestCase):
def tearDown(self):
scanner.Queue.tasks_finished()
def test_scanner_init(self):
def test_init(self):
self.assertIsInstance(scanner.Queue, scanner.queue.TaskQueue)
def test_scanner_import_track(self):
lib = db.library.Table.find(test_album)
scanner.import_track(lib, test_track, 2, datetime.date.today(), [ ])
self.assertIsInstance(scanner.Queue.tasks[0], scanner.task.ImportTask)
def test_scanner_update_library(self):
def test_update_library(self):
lib = db.library.Table.find(test_album)
scanner.update_library(lib)
self.assertIsInstance(scanner.Queue.tasks[0],
scanner.task.CheckSchedulerTask)
self.assertIsInstance(scanner.Queue.tasks[1],
scanner.task.DirectoryTask)
def test_scanner_remove_library(self):
def test_remove_library(self):
lib = db.library.Table.find(test_album)
scanner.remove_library(lib)
self.assertIsInstance(scanner.Queue.tasks[0],
scanner.task.RemoveLibrarySchedulerTask)
def test_scanner_commit(self):
def test_commit(self):
scanner.commit()
self.assertIsInstance(scanner.Queue.tasks[0], scanner.task.CommitTask)
def test_scanner_widgets(self):
def test_widgets(self):
lib = db.library.Table.find(test_album)
self.assertIsInstance(scanner.ProgressBar(),

View File

@ -10,7 +10,7 @@ test_tracks = pathlib.Path("./data/Test Album")
test_track01 = test_tracks / "01 - Test Track.ogg"
class TestScannerTask(unittest.TestCase):
def test_scanner_task_init(self):
def test_task(self):
t = task.Task()
self.assertIsInstance(t, GObject.GObject)
with self.assertRaises(NotImplementedError):
@ -18,7 +18,7 @@ class TestScannerTask(unittest.TestCase):
class TestScannerCommitTask(unittest.TestCase):
def test_scanner_commit_task(self):
def test_task(self):
ct = task.CommitTask()
self.assertIsInstance(ct, task.Task)
self.assertEqual(ct.run_task(), None)
@ -28,7 +28,7 @@ class TestScannerFileTask(unittest.TestCase):
def setUp(self):
db.reset()
def test_scanner_file_task(self):
def test_task(self):
lib = db.library.Table.find(test_tracks)
ft = task.FileTask(lib, test_track01)
@ -65,42 +65,11 @@ class TestScannerFileTask(unittest.TestCase):
self.assertEqual(new.get_track(0), track)
class TestScannerImportTask(unittest.TestCase):
def setUp(self):
db.reset()
def test_scanner_import_task(self):
playlists = [ "Collection", "Favorites", "New Tracks",
"Previous", "Up Next", "Test Playlist" ]
today = datetime.date.today()
lib = db.library.Table.find(test_tracks)
it = task.ImportTask(lib, test_track01, 4, today, playlists)
self.assertIsInstance(it, task.FileTask)
self.assertEqual(it.library, lib)
self.assertEqual(it.filepath, test_track01)
self.assertEqual(it.playcount, 4)
self.assertEqual(it.lastplayed,
datetime.datetime.combine(today, datetime.time()))
self.assertEqual(it.playlists, ["Favorites", "Queued Tracks", "Test Playlist" ])
self.assertIsNone(it.run_task())
track = db.track.Table.lookup(test_track01)
self.assertEqual(track.playcount, 4)
self.assertEqual(track.lastplayed,
datetime.datetime.combine(today, datetime.time()))
self.assertEqual(db.user.Table.lookup("Favorites").get_track(0), track)
self.assertEqual(db.user.Table.lookup("Queued Tracks").get_track(0), track)
self.assertEqual(db.user.Table.lookup("Test Playlist").get_track(0), track)
self.assertEqual(db.user.Table.lookup("New Tracks").get_n_tracks(), 0)
class TestScannerDirectoryTask(unittest.TestCase):
def setUp(self):
db.reset()
def test_scanner_directory_task(self):
def test_task(self):
lib = db.library.Table.find(test_tracks)
dt = task.DirectoryTask(lib, test_tracks)
@ -125,7 +94,7 @@ class TestScannerCheckTask(unittest.TestCase):
def setUp(self):
db.reset()
def test_scanner_check_task(self):
def test_task(self):
lib = db.library.Table.find(test_tracks)
for i in [ 1, 2, 3 ]:
db.make_fake_track(i, i, f"Test Track {i}", f"{lib.path}/{i}.ogg", lib.path)
@ -148,7 +117,7 @@ class TestScannerCheckSchedulerTask(unittest.TestCase):
def setUp(self):
db.reset()
def test_scanner_check_scheduler_task(self):
def test_task(self):
lib = db.library.Table.find(test_tracks)
track = db.make_fake_track(1, 1, "Test Album Track", str(test_track01), str(test_tracks))
@ -178,7 +147,7 @@ class TestScannerRemoveTask(unittest.TestCase):
def setUp(self):
db.reset()
def test_scanner_remove_task(self):
def test_task(self):
lib = db.library.Table.find(test_tracks)
for i in [ 1, 2, 3, 4, 5 ]:
db.make_fake_track(i, i, f"Test Track {i}", f"{lib.path}/{i}.ogg", lib.path)
@ -197,7 +166,7 @@ class TestScannerRemoveLibraryTask(unittest.TestCase):
def setUp(self):
db.reset()
def test_scanner_remove_library_task(self):
def test_task(self):
lib = db.library.Table.find(test_tracks)
rlt = task.RemoveLibraryTask(lib)
@ -212,7 +181,7 @@ class TestScannerRemoveLibrarySchedulerTask(unittest.TestCase):
def setUp(self):
db.reset()
def test_scanner_check_scheduler_task(self):
def test_task(self):
lib = db.library.Table.find(test_tracks)
for i in range(75):
db.make_fake_track(i, i, f"Test Track {i}",f"/a/b/c/{i}.ogg", lib.path)

View File

@ -18,7 +18,7 @@ class FakeTask(task.Task):
def run_task(self): return None
class TestScannerProgressBar(unittest.TestCase):
def test_progress_bar(self):
def test_init(self):
q = queue.TaskQueue()
pb = widgets.ProgressBar(q)
@ -39,7 +39,7 @@ class TestScannerProgressBar(unittest.TestCase):
class DirectoryChooserWidget(unittest.TestCase):
def test_directory_chooser_widget(self):
def test_init(self):
dcw = widgets.DirectoryChooserWidget()
self.assertIsInstance(dcw, Gtk.FileChooserWidget)
@ -61,7 +61,7 @@ class DirectoryChooserWidget(unittest.TestCase):
class DirectoryChooserPopover(unittest.TestCase):
def test_directory_chooser_popover(self):
def test_init(self):
db.reset()
q = queue.TaskQueue()
dcp = widgets.DirectoryChooserPopover(q)
@ -97,7 +97,7 @@ class DirectoryChooserPopover(unittest.TestCase):
class TestScannerAddFolderButton(unittest.TestCase):
def test_add_folder_button(self):
def test_init(self):
q = queue.TaskQueue()
afb = widgets.AddFolderButton(q)
@ -110,7 +110,7 @@ class TestScannerAddFolderButton(unittest.TestCase):
class TestScannerUpdateButton(unittest.TestCase):
def test_update_button(self):
def test_init(self):
lib = db.library.Table.find("/a/b/c")
q = queue.TaskQueue()
ub = widgets.UpdateButton(lib, q)
@ -130,7 +130,7 @@ class TestScannerUpdateButton(unittest.TestCase):
class TestScannerUpdateAllButton(unittest.TestCase):
def test_update_all_button(self):
def test_init(self):
db.reset()
lib1 = db.library.Table.find("/a/b/c")
lib2 = db.library.Table.find("/d/e/f")
@ -153,7 +153,7 @@ class TestScannerUpdateAllButton(unittest.TestCase):
class TestScannerRemoveButton(unittest.TestCase):
def test_remove_button(self):
def test_init(self):
lib = db.library.Table.find("/a/b/c")
q = queue.TaskQueue()
rb = widgets.RemoveButton(lib, q)

View File

@ -1,111 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
from . import allocator
from . import stack
from . import tags
from . import track
import db
import lib
import pathlib
import scanner
import threading
File = "tagdb.pickle"
Bus = lib.bus.Bus(500)
Tracks = allocator.TrackAllocator()
Stack = stack.TagStack()
class LibraryTag(lib.tag.Tag):
def __init__(self, path):
super().__init__(path)
self.clear = lib.thread.Thread(self.__do_clear__)
self.scan = lib.thread.Thread(self.__do_scan__)
def __setstate__(self, state):
super().__setstate__(state)
self.clear = lib.thread.Thread(self.__do_clear__)
self.scan = lib.thread.Thread(self.__do_scan__)
def __do_scan__(self):
for trak in Tracks.autoremove(self):
self.remove_track(trak)
track_set = set([ t.filepath() for t in Tracks.list_tracks(self) ])
for f in self.name.rglob("*"):
if f not in track_set and f.is_file():
if (track := Tracks.allocate(self, f)) != None:
self.add_track(track)
def __do_clear__(self):
for trak in self.tracks:
Tracks.remove(trak)
self.tracks.clear()
def fix_tracks(self):
for (index, trak) in enumerate(self.tracks):
t = Tracks[trak]
if t is not None:
self.tracks[index] = t
class LibraryStore(lib.tagstore.TagStore):
def __alloc_tag__(self, name, sort):
return LibraryTag(name)
def add(self, name):
return super().__add_tag__(name, None, None)
def remove(self, lib):
lib.clear()
super().remove(lib)
def fix_tracks(self):
for (id, tag) in self.store.items():
tag.fix_tracks()
Library = LibraryStore()
def _do_save():
try:
with lib.data.DataFile(File, lib.data.WRITE) as f:
f.pickle([ tags.get_state(), Tracks, Library, Stack ])
except Exception as e:
return lib.bus.RETRY
def save(*args):
Bus.board(_do_save)
def load():
global Library
global Tracks
global Stack
if not db.new_db():
return
with lib.data.DataFile(File, lib.data.READ) as f:
if f.exists():
(tagstate, Tracks, Library, Stack) = f.unpickle()
tags.set_state(*tagstate)
Tracks.load_tags()
Library.fix_tracks()
__register_callbacks()
scanner.Queue.push(scanner.task.CommitTask())
def __register_callbacks():
for store in [ Library, tags.User, Tracks ]:
store.Added.register(save)
store.Removed.register(save)
Tracks.Updated.register(save)
Stack.PushPop.register(save)
Stack.NextTrack.register(save)
__register_callbacks()
def reset():
Tracks.reset()
Library.reset()
Stack.reset()
tags.reset()
Bus.clear()
lib.data.DataFile(File, lib.data.READ).remove()
__register_callbacks()

View File

@ -1,85 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import publisher
from . import track
import threading
class TrackAllocator:
def __init__(self):
self.tracks = dict()
self.nextid = 0
self.lock = threading.Lock()
self.Added = publisher.Publisher()
self.Removed = publisher.Publisher()
self.Updated = publisher.Publisher()
def __alloc_track__(self, lib, filepath):
with self.lock:
trak = track.Track(self.nextid, filepath, lib)
self.tracks[self.nextid] = trak
self.nextid += 1
self.Added.publish(trak)
return trak
def __getitem__(self, id):
with self.lock:
return self.tracks.get(id, None)
def __getstate__(self):
with self.lock:
return { "tracks" : self.tracks,
"nextid" : self.nextid }
def __len__(self):
with self.lock:
return len(self.tracks)
def __setstate__(self, state):
self.__dict__.update(state)
self.lock = threading.Lock()
self.Added = publisher.Publisher()
self.Removed = publisher.Publisher()
self.Updated = publisher.Publisher()
def allocate(self, lib, filepath):
try:
return self.__alloc_track__(lib, filepath)
except Exception as e:
pass
def autoremove(self, lib):
with self.lock:
to_rm = [ trak for trak in self.tracks.values() \
if trak.library == lib and not trak.filepath().exists() ]
for trak in to_rm:
trak.about_to_remove()
del self.tracks[trak.trackid]
self.Removed.publish(trak)
return to_rm
def list_tracks(self, lib):
with self.lock:
for (id, track) in self.tracks.items():
if track.library ==lib:
yield track
def load_tags(self):
for (id, track) in self.tracks.items():
track.__set_tags__()
def played(self, track):
with self.lock:
track.played()
self.Updated.publish(track)
def remove(self, track):
with self.lock:
track.about_to_remove()
del self.tracks[track.trackid]
self.Removed.publish(track)
def reset(self):
with self.lock:
self.nextid = 0
self.tracks.clear()
self.Added.reset()
self.Removed.reset()

View File

@ -1,75 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import counter
from lib import publisher
from . import tags
class TagStack:
def __init__(self):
self.tags = [ ]
self.Counter = counter.Counter(-1, 99)
self.PushPop = publisher.Publisher()
self.NextTrack = publisher.Publisher()
def __do_next__(self, tag):
if track := tag.next():
track.add_to_playlist("Previous")
return track
def __getstate__(self):
return { "tags" : self.tags }
def __setstate__(self, state):
self.__dict__.update(state)
self.Counter = counter.Counter(-1, 99)
self.PushPop = publisher.Publisher()
self.NextTrack = publisher.Publisher()
def current(self):
if len(self.tags) == 0:
return tags.User.store["Collection"]
return self.tags[0]
def __next_track__(self):
if len(self.tags) == 0:
return self.__do_next__(tags.User["Collection"])
if track := self.__do_next__(self.tags[0]):
return track
self.pop()
return self.__next_track__()
def next(self):
ret = self.__next_track__()
count = self.Counter.decrement()
self.NextTrack.publish(ret)
return (ret, count != -1)
def pop(self):
prev = self.tags.pop(0)
self.PushPop.publish(prev, self.current())
def previous(self):
return tags.User["Previous"].next()
def push(self, tag):
prev = self.current()
if tag == tags.User["Previous"]:
return
if tag == tags.User["Collection"]:
self.tags.clear()
self.PushPop.publish(prev, tags.User["Collection"])
return
if tag in self.tags:
self.tags.remove(tag)
self.tags.insert(0, tag)
tag.stacked()
self.PushPop.publish(prev, tag)
def queue(self, track):
track.add_to_playlist("Up Next")
self.push(tags.User["Up Next"])
def reset(self):
self.tags.clear()
self.count = None
self.PushPop.reset()

View File

@ -1,36 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from . import user
from lib import tagstore
Artist = tagstore.TagStore()
Album = tagstore.TagSuperStore()
Genre = tagstore.TagStore()
Decade = tagstore.TagStore()
Year = tagstore.TagSuperStore()
User = user.UserTagStore()
def get_state():
return (Artist, Album, Genre, Decade, Year, User)
def set_state(artist, album, genre, decade, year, user):
global Artist
global Album
global Genre
global Decade
global Year
global User
Artist = artist
Album = album
Genre = genre
Decade = decade
Year = year
User = user
def reset():
Artist.reset()
Album.reset()
Genre.reset()
Decade.reset()
Year.reset()
User.reset()

View File

@ -1,110 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import publisher
from . import allocator
import pathlib
import threading
import unittest
test_tracks = pathlib.Path("./data/Test Album")
class FakeLibrary:
def __init__(self):
self.name = test_tracks
class TestTrackAllocator(unittest.TestCase):
def setUp(self):
self.lib = FakeLibrary()
self.added = None
self.removed = None
self.updated = None
def on_track_added(self, track):
self.added = track
def on_track_removed(self, track):
self.removed = track
def on_track_updated(self, track):
self.updated = track
def test_allocator_init(self):
alloc = allocator.TrackAllocator()
self.assertEqual(alloc.tracks, { })
self.assertEqual(alloc.nextid, 0)
self.assertIsInstance(alloc.lock, type(threading.Lock()))
self.assertIsInstance(alloc.Added, publisher.Publisher)
self.assertIsInstance(alloc.Removed, publisher.Publisher)
self.assertIsInstance(alloc.Updated, publisher.Publisher)
def test_allocator(self):
alloc = allocator.TrackAllocator()
alloc.Added.register(self.on_track_added)
alloc.Removed.register(self.on_track_removed)
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
self.assertEqual(alloc.tracks[0], track)
self.assertEqual(alloc[0], track)
self.assertEqual(len(alloc), 1)
self.assertEqual(alloc.nextid, 1)
self.assertEqual(track.trackid, 0)
self.assertEqual(self.added, track)
track2 = alloc.allocate(self.lib, test_tracks / "02 - Test {Disc 2}.ogg")
self.assertEqual(alloc.tracks[1], track2)
self.assertEqual(alloc[1], track2)
self.assertEqual(len(alloc), 2)
self.assertIsNone(alloc.allocate(self.lib, test_tracks / "No Such File"))
self.assertEqual(self.added, track2)
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track, track2 ])
track.library = None
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track2 ])
alloc.remove(track)
self.assertEqual(alloc[0], None)
self.assertEqual(alloc.tracks, { 1 : track2 })
self.assertEqual(alloc.nextid, 2)
self.assertEqual(self.removed, track)
alloc.reset()
self.assertEqual(alloc.nextid, 0)
self.assertEqual(alloc.tracks, { })
self.assertEqual(alloc.Added.subscribers, set())
self.assertEqual(alloc.Removed.subscribers, set())
def test_allocator_autoremove(self):
alloc = allocator.TrackAllocator()
alloc.Removed.register(self.on_track_removed)
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
track2 = alloc.allocate(self.lib, test_tracks / "02 - Test {Disc 2}.ogg")
track2.path = pathlib.Path("No Such File")
self.assertEqual(alloc.autoremove(self.lib), [ track2 ])
self.assertEqual([ t for t in alloc.list_tracks(self.lib) ], [ track ])
def test_allocator_played(self):
alloc = allocator.TrackAllocator()
alloc.Updated.register(self.on_track_updated)
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
alloc.played(track)
self.assertEqual(track.playcount, 1)
self.assertEqual(self.updated, track)
def test_allocator_state(self):
alloc = allocator.TrackAllocator()
track = alloc.allocate(self.lib, test_tracks / "01 - Test Track.ogg")
state = alloc.__getstate__()
self.assertEqual(state, { "tracks" : { 0 : track },
"nextid" : 1 })
alloc.__dict__.clear()
alloc.__setstate__(state)
self.assertEqual(alloc.tracks, { 0 : track })
self.assertEqual(alloc.nextid, 1)
self.assertIsInstance(alloc.lock, type(threading.Lock()))
self.assertIsInstance(alloc.Added, publisher.Publisher)
self.assertIsInstance(alloc.Removed, publisher.Publisher)
self.assertIsInstance(alloc.Updated, publisher.Publisher)

View File

@ -1,186 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import counter
from lib import publisher
from lib import tag
from . import stack
from . import tags
import unittest
class FakeTrack:
def __init__(self, n):
self.n = n
self.length = n
def add_to_playlist(self, name):
tags.User.add(name, self)
def remove_from_playlist(self, name):
tags.User[name].remove_track(self)
class TestTagStack(unittest.TestCase):
def setUp(self):
self.pushpop = None
self.next_track = None
def tearDown(self):
tags.reset()
def on_next_track(self, track):
self.next_track = track
def on_push_pop(self, prev, new):
self.pushpop = (prev, new)
def test_tag_stack_init(self):
s = stack.TagStack()
self.assertIsInstance(s.Counter, counter.Counter)
self.assertIsInstance(s.PushPop, publisher.Publisher)
self.assertIsInstance(s.NextTrack, publisher.Publisher)
self.assertEqual(s.tags, [ ])
def test_tag_stack_next(self):
s = stack.TagStack()
t = tag.Tag("Test")
s.NextTrack.register(self.on_next_track)
s.push(t)
t.tracks = [ FakeTrack(1), FakeTrack(2), FakeTrack(3) ]
tags.User["Collection"].tracks = [ FakeTrack(4), FakeTrack(5) ]
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (1, True) )
self.assertEqual(self.next_track.n, 1)
self.assertEqual(s.tags, [ t ])
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (2, True) )
self.assertEqual(self.next_track.n, 2)
self.assertEqual(s.tags, [ t ])
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (3, True) )
self.assertEqual(self.next_track.n, 3)
self.assertEqual(s.tags, [ t ])
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (4, True) )
self.assertEqual(self.next_track.n, 4)
self.assertEqual(s.tags, [ ])
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (5, True) )
self.assertEqual(self.next_track.n, 5)
self.assertEqual([ t.n for t in tags.User["Previous"].tracks ],
[ 5, 4, 3, 2, 1 ])
def test_tag_stack_autopause(self):
s = stack.TagStack()
t = tag.Tag("Test")
s.push(t)
t.tracks = [ FakeTrack(1), FakeTrack(2), FakeTrack(3), FakeTrack(4) ]
s.Counter.set_value(2)
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (1, True) )
self.assertEqual(s.tags, [ t ])
self.assertEqual(s.Counter.get_value(), 1)
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (2, True) )
self.assertEqual(s.tags, [ t ])
self.assertEqual(s.Counter.get_value(), 0)
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (3, False) )
self.assertEqual(s.tags, [ t ])
self.assertEqual(s.Counter.get_value(), -1)
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (4, True) )
self.assertEqual(s.tags, [ t ])
self.assertEqual(s.Counter.get_value(), -1)
def test_tag_stack_pop(self):
s = stack.TagStack()
t1 = tag.Tag("Test")
t2 = tag.Tag("Test Two")
s.tags = [ t1, t2 ]
s.PushPop.register(self.on_push_pop)
s.pop()
self.assertEqual(s.tags, [ t2 ])
self.assertEqual(self.pushpop, (t1, t2))
s.pop()
self.assertEqual(s.tags, [ ])
self.assertEqual(self.pushpop, (t2, tags.User["Collection"]))
def test_tag_stack_previous(self):
s = stack.TagStack()
for i in [ 1, 2, 3 ]:
tags.User["Previous"].add_track(i)
self.assertEqual(s.previous(), 2)
self.assertEqual(s.previous(), 1)
self.assertIsNone(s.previous())
def test_tag_stack_push(self):
s = stack.TagStack()
t1 = tag.Tag("Test")
t2 = tag.Tag("Test Two")
t1.current = 3
s.PushPop.register(self.on_push_pop)
s.push(t1)
self.assertEqual(s.tags, [ t1 ])
self.assertEqual(t1.current, -1)
self.assertEqual(self.pushpop, (tags.User["Collection"], t1))
s.push(t2)
self.assertEqual(s.tags, [ t2, t1 ])
self.assertEqual(self.pushpop, (t1, t2))
s.push(t1)
self.assertEqual(s.tags, [ t1, t2 ])
self.assertEqual(self.pushpop, (t2, t1))
s.push(tags.User["Previous"])
self.assertEqual(s.tags, [ t1, t2 ])
s.push(tags.User["Collection"])
self.assertEqual(s.tags, [ ])
self.assertEqual(self.pushpop, (t1, tags.User["Collection"]))
def test_tag_stack_queue(self):
s = stack.TagStack()
s.queue(FakeTrack(1))
self.assertEqual(s.tags, [ tags.User["Up Next"] ])
(res, cont) = s.next()
self.assertEqual( (res.n, cont), (1, True) )
self.assertEqual(tags.User["Up Next"].tracks, [ ])
def test_tag_stack_state(self):
s = stack.TagStack()
t = tag.Tag("Test")
s.push(t)
state = s.__getstate__()
self.assertEqual(state, { "tags" : [ t ] })
s.__dict__.clear()
s.__setstate__(state)
self.assertEqual(s.tags, [ t ])
self.assertIsInstance(s.Counter, counter.Counter)
self.assertIsInstance(s.PushPop, publisher.Publisher)
self.assertIsInstance(s.NextTrack, publisher.Publisher)
s.PushPop.register(self.on_push_pop)
s.PushPop.register(self.on_next_track)
s.count = 3
s.reset()
self.assertEqual(s.tags, [ ])
self.assertIsInstance(s.Counter, counter.Counter)
self.assertEqual(len(s.PushPop.subscribers), 0)
self.assertEqual(len(s.NextTrack.subscribers), 0)

View File

@ -1,174 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
import db
import lib
import pathlib
import tagdb
import unittest
test_tracks = pathlib.Path("./data/Test Album")
class TestLibraryTag(unittest.TestCase):
def tearDown(self):
db.NewDatabase = True
tagdb.reset()
def test_library_tag_init(self):
library = tagdb.LibraryTag(test_tracks)
self.assertIsInstance(library, lib.tag.Tag)
self.assertIsInstance(library.clear, lib.thread.Thread)
self.assertIsInstance(library.scan, lib.thread.Thread)
def test_library_tag_state(self):
lib = tagdb.LibraryTag(test_tracks)
lib.scan().join()
state = lib.__getstate__()
self.assertEqual(set(state.keys()),
set([ "name", "sort", "current", "loop", "random", "tracks" ]))
self.assertEqual(state["name"], test_tracks)
self.assertEqual(state["tracks"], [ i for i in range(12) ])
lib.__dict__.clear()
lib.__setstate__(state)
lib.fix_tracks()
self.assertEqual(lib.name, test_tracks)
self.assertEqual(lib.tracks, [ tagdb.Tracks[i] for i in range(12) ])
self.assertEqual(lib.clear.func, lib.__do_clear__)
self.assertEqual(lib.scan.func, lib.__do_scan__)
def test_library_tag_scan(self):
lib = tagdb.LibraryTag(test_tracks)
lib.scan().join()
for i in range(12):
self.assertEqual(lib.tracks[i], tagdb.Tracks[i])
def test_library_tag_scan_new(self):
lib = tagdb.LibraryTag(test_tracks)
lib.scan().join()
for trak in [ tagdb.Tracks[0], tagdb.Tracks[11] ]:
lib.remove_track(trak)
tagdb.Tracks.remove(trak)
lib.scan().join()
self.assertEqual(len(lib.tracks), 12)
def test_library_tag_scan_remove(self):
lib = tagdb.LibraryTag(test_tracks)
lib.scan().join()
trak = tagdb.track.Track(tagdb.Tracks.nextid,
test_tracks / "01 - Test Track.ogg", lib)
trak.path = pathlib.Path("No Such File")
lib.tracks.append(trak)
tagdb.Tracks.tracks[trak.trackid] = trak
lib.scan().join()
self.assertNotIn(trak, lib.tracks)
def test_library_tag_clear(self):
lib = tagdb.LibraryTag(test_tracks)
lib.scan().join()
self.assertEqual(len(lib), 12)
lib.clear().join()
self.assertEqual(len(lib), 0)
self.assertEqual(len(tagdb.Tracks), 0)
class TestLibraryStore(unittest.TestCase):
def test_library_store(self):
store = tagdb.LibraryStore()
lib = store.add(test_tracks)
self.assertIsInstance(lib, tagdb.LibraryTag)
lib.scan().join()
self.assertEqual(len(lib), 12)
store.remove(lib)
lib.clear.join()
self.assertEqual(len(lib), 0)
class TestTrackDB(unittest.TestCase):
def tearDown(self):
tagdb.reset()
def test_tagdb_init(self):
self.assertIsInstance(tagdb.Bus, lib.bus.Bus)
self.assertIsInstance(tagdb.Tracks, tagdb.allocator.TrackAllocator)
self.assertIsInstance(tagdb.Stack, tagdb.stack.TagStack)
self.assertIsInstance(tagdb.Library, tagdb.LibraryStore)
self.assertEqual(tagdb.File, "tagdb.pickle")
self.assertEqual(tagdb.Bus.timeout, 500)
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
self.assertIn(tagdb.save, tagdb.Library.Removed.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Removed.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Updated.subscribers)
self.assertIn(tagdb.save, tagdb.tags.User.Added.subscribers)
self.assertIn(tagdb.save, tagdb.tags.User.Removed.subscribers)
self.assertIn(tagdb.save, tagdb.Stack.PushPop.subscribers)
self.assertIn(tagdb.save, tagdb.Stack.NextTrack.subscribers)
def test_tagdb_save_load(self):
db_file = lib.data.DataFile(tagdb.File, lib.data.READ)
library = tagdb.Library.add(test_tracks)
library.scan()
tagdb.Stack.push(library)
tagdb.save()
self.assertEqual(tagdb.Bus.passengers, [ (tagdb._do_save, ()) ])
self.assertFalse(db_file.exists())
library.scan.join()
tagdb.Bus.complete()
self.assertTrue(db_file.exists())
db.reset()
tagdb.tags.reset()
tagdb.Library.reset()
tagdb.Tracks.reset()
tagdb.Stack.reset()
tagdb.load()
self.assertEqual(len(tagdb.Tracks), 12)
self.assertEqual(len(tagdb.Library), 1)
self.assertEqual(len(tagdb.Library[test_tracks]), 12)
self.assertEqual(len(tagdb.tags.Artist), 3)
self.assertEqual(len(tagdb.tags.Album), 12)
self.assertEqual(len(tagdb.tags.Genre), 4)
self.assertEqual(len(tagdb.tags.Decade), 2)
self.assertEqual(len(tagdb.tags.Year), 2)
self.assertEqual(tagdb.Stack.tags, [ tagdb.Library[test_tracks] ])
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
self.assertIn(tagdb.save, tagdb.tags.User.Added.subscribers)
def test_tagdb_stress(self):
lib = tagdb.Library.add(pathlib.Path("./trier/Test Library/"))
lib.scan().join()
tagdb.Bus.complete()
tagdb.Library.remove(lib)
lib.clear.join()
tagdb.Bus.complete()
def test_tagdb_reset(self):
tagdb.Tracks.Added.register(1)
tagdb.Tracks.Removed.register(1)
tagdb.Tracks.Updated.register(1)
tagdb.Library.Added.register(1)
tagdb.Library.Removed.register(1)
tagdb.Library.store = { "a" : 1, "b" : 2, "c" : 3 }
with lib.data.DataFile(tagdb.File, lib.data.WRITE) as f:
f.pickle([ 0, [] ])
tagdb.reset()
self.assertEqual(len(tagdb.Library), 0)
self.assertIn(tagdb.save, tagdb.Tracks.Added.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Removed.subscribers)
self.assertIn(tagdb.save, tagdb.Tracks.Updated.subscribers)
self.assertIn(tagdb.save, tagdb.Library.Added.subscribers)
self.assertIn(tagdb.save, tagdb.Library.Removed.subscribers)
self.assertFalse(lib.data.DataFile(tagdb.File, lib.data.READ).exists())

View File

@ -1,50 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import tagstore
from . import tags
from . import user
import unittest
class TestTags(unittest.TestCase):
def tearDown(self):
tags.reset()
def test_tags_init(self):
self.assertIsInstance(tags.Artist, tagstore.TagStore)
self.assertIsInstance(tags.Album, tagstore.TagSuperStore)
self.assertIsInstance(tags.Genre, tagstore.TagStore)
self.assertIsInstance(tags.Decade, tagstore.TagStore)
self.assertIsInstance(tags.Year, tagstore.TagSuperStore)
self.assertIsInstance(tags.User, user.UserTagStore)
def test_tags_reset(self):
tags.Artist.store = {"a" : 1 }
tags.Album.store = {("a", "b") : 2 }
tags.Genre.store = {"c" : 3 }
tags.Decade.store = {"d" : 4 }
tags.User.store = {"e" : 5 }
tags.reset()
self.assertEqual(tags.Artist.store, { })
self.assertEqual(tags.Album.store, { })
self.assertEqual(tags.Genre.store, { })
self.assertEqual(tags.Decade.store, { })
self.assertIsNotNone(tags.User["Collection"])
self.assertIsNotNone(tags.User["Up Next"])
self.assertIsNotNone(tags.User["Previous"])
self.assertIsNotNone(tags.User["Favorites"])
self.assertIsNotNone(tags.User["Up Next"])
def test_tags_state(self):
state = tags.get_state()
self.assertEqual(state, ( tags.Artist, tags.Album, tags.Genre,
tags.Decade, tags.Year, tags.User ))
tags.set_state(*(1, 2, 3, 4, 5, 6))
self.assertEqual(tags.Artist, 1)
self.assertEqual(tags.Album, 2)
self.assertEqual(tags.Genre, 3)
self.assertEqual(tags.Decade, 4)
self.assertEqual(tags.Year, 5)
self.assertEqual(tags.User, 6)
tags.set_state(*state)

View File

@ -1,173 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
from lib import publisher
from gi.repository import GObject
from . import tags
from . import track
import datetime
import pathlib
import unittest
test_tracks = pathlib.Path("./data/Test Album")
class FakeLibrary:
def __init__(self):
self.name = test_tracks
class TestTrack(unittest.TestCase):
def setUp(self):
tags.reset()
self.lib = FakeLibrary()
def tearDown(self):
tags.reset()
def test_track_init(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.trackid, 1)
self.assertEqual(trak.filepath(), test_tracks / "01 - Test Track.ogg")
self.assertIsInstance(trak, GObject.Object)
def test_track_album(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
artist = trak.artist
self.assertEqual(trak.album, tags.Album[artist, "Test Album"])
self.assertEqual(trak["album"], "Test Album")
def test_track_artist(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.artist, tags.Artist["Test Artist"])
self.assertEqual(trak["artist"], "Test Artist")
self.assertEqual(trak.artist.sort, "artist, test")
trak2 = track.Track(2, test_tracks / "02 - Test {Disc 2}.ogg", self.lib)
self.assertEqual(trak2.artist, tags.Artist["Test Album Artist"])
self.assertEqual(trak2.artist.sort, "album artist, test")
def test_track_decade(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.decade, tags.Decade["2010s"])
self.assertEqual(trak["decade"], "2010s")
def test_track_discnumber(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.discnumber, 1)
self.assertEqual(trak["discnumber"], "01")
def test_track_filepath(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.path, pathlib.Path("01 - Test Track.ogg"))
self.assertEqual(trak.filepath(), test_tracks / "01 - Test Track.ogg")
def test_track_genres(self):
trak = track.Track(1, test_tracks / "02 - Test {Disc 2}.ogg", self.lib)
genlist = [ tags.Genre["Test"], tags.Genre["Genre"], tags.Genre["List"] ]
self.assertEqual(trak.genres, genlist)
self.assertEqual(trak["genres"], "Test, Genre, List")
def test_track_length(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.length, 10)
self.assertEqual(trak["length"], "0:10")
trak.length = 61
self.assertEqual(trak["length"], "1:01")
trak.length = 3
self.assertEqual(trak["length"], "0:03")
def test_track_played(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
trak.playcount = 0
trak.lastplayed = None
self.assertEqual(trak["playcount"], "0")
self.assertEqual(trak["lastplayed"], "Never")
trak.played()
self.assertEqual(trak.playcount, 1)
self.assertEqual(trak.lastplayed.date(), datetime.date.today())
self.assertEqual(trak["playcount"], "1")
#self.assertEqual(trak["lastplayed"], str(datetime.date.today()))
trak.played()
self.assertEqual(trak.playcount, 2)
self.assertEqual(trak["playcount"], "2")
def test_track_title(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.title, "Test Track")
self.assertEqual(trak["title"], "Test Track")
def test_track_tracknumber(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.tracknumber, 1)
self.assertEqual(trak["tracknumber"], "1-01")
trak.tracknumber = 10
self.assertEqual(trak["tracknumber"], "1-10")
def test_track_year(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
self.assertEqual(trak.year, tags.Year[trak.decade, "2019"])
self.assertEqual(trak["year"], "2019")
trak2 = track.Track(2, test_tracks / "02 - Test {Disc 2}.ogg", self.lib)
self.assertEqual(trak2.year, tags.Year[trak2.decade, "2019"])
self.assertEqual(trak2["year"], "2019")
def test_track_playlists(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
default = [ tags.User["Collection"], tags.User["New Tracks"] ]
self.assertEqual(trak.playlists, default)
trak.add_to_playlist("Test")
self.assertEqual(trak.playlists, default + [ tags.User["Test"] ])
trak.remove_from_playlist("Test")
self.assertEqual(trak.playlists, default)
self.assertEqual(len(tags.User["Test"]), 0)
def test_track_about_to_remove(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
trak.about_to_remove()
self.assertEqual(len(tags.Artist), 0)
self.assertEqual(len(tags.Album), 0)
self.assertEqual(len(tags.Genre), 0)
self.assertEqual(len(tags.Decade), 0)
self.assertEqual(len(tags.Year), 0)
self.assertEqual(len(tags.User["Collection"]), 0)
self.assertEqual(len(tags.User["New Tracks"]), 0)
def test_track_state(self):
trak = track.Track(1, test_tracks / "01 - Test Track.ogg", self.lib)
trak.add_to_playlist("Starred")
trak.add_to_playlist("Previous")
state = trak.__getstate__()
self.assertEqual(state["artist"], "Test Artist")
self.assertEqual(state["album"], "Test Album")
self.assertEqual(state["genres"], [ "Test" ])
self.assertEqual(state["decade"], "2010s")
self.assertEqual(state["year"], "2019")
self.assertEqual(state["playlists"], [ "Collection", "Starred" ])
tags.Artist["Test Artist"].tracks = [ 1 ]
tags.Album[trak.artist, "Test Album"].tracks = [ 1 ]
tags.Genre["Test"].tracks = [ 1 ]
tags.Decade["2010s"].tracks = [ 1 ]
tags.Year[trak.decade, "2019"].tracks = [ 1 ]
tags.User["Collection"].tracks = [ 1 ]
tags.User["Starred"].tracks = [ 1 ]
trak.__dict__.clear()
trak.__setstate__(state)
trak.__set_tags__()
self.assertEqual(trak.artist, tags.Artist["Test Artist"])
self.assertEqual(trak.album, tags.Album[trak.artist, "Test Album"])
self.assertEqual(trak.genres, [ tags.Genre["Test"] ])
self.assertEqual(trak.decade, tags.Decade["2010s"])
self.assertEqual(trak.year, tags.Year[trak.decade, "2019"])
self.assertEqual(trak.playlists, [ tags.User["Collection"],
tags.User["Starred"] ])
self.assertEqual(tags.Artist["Test Artist"].tracks, [ trak ])
self.assertEqual(tags.Album[trak.artist, "Test Album"].tracks, [ trak ])
self.assertEqual(tags.Genre["Test"].tracks, [ trak ])
self.assertEqual(tags.Decade["2010s"].tracks, [ trak ])
self.assertEqual(tags.Year[trak.decade, "2019"].tracks, [ trak ])
self.assertEqual(tags.User["Collection"].tracks, [ trak ])
self.assertEqual(tags.User["Starred"].tracks, [ trak ])

View File

@ -1,99 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import fake
from lib import tag
from lib import tagstore
from . import user
import unittest
class TestUserTags(unittest.TestCase):
def test_collection_tag(self):
c = user.CollectionTag()
self.assertIsInstance(c, tag.Tag)
self.assertTrue(c.loop)
self.assertFalse(c.can_loop())
def test_new_tracks_tag(self):
n = user.NewTracksTag()
self.assertIsInstance(n, tag.Tag)
self.assertTrue(n.can_loop())
self.assertTrue(n.can_random())
n.tracks = [ fake.Track(1), fake.Track(2), fake.Track(3) ]
state = n.__getstate__()
self.assertEqual(state["name"], "New Tracks")
self.assertEqual(state["sort"], "new tracks")
self.assertEqual(state["current"], -1)
self.assertEqual(state["tracks"], [ ])
self.assertFalse(state["loop"])
self.assertFalse(state["random"])
def test_previous_tag(self):
p = user.PreviousTag()
self.assertIsInstance(p, tag.Tag)
self.assertFalse(p.can_loop())
self.assertFalse(p.can_random())
p.add_track(fake.Track(1))
self.assertEqual(p.tracks, [ fake.Track(1) ])
self.assertIsNone(p.next())
p.add_track(fake.Track(2))
self.assertEqual(p.tracks, [ fake.Track(2), fake.Track(1) ])
self.assertEqual(p.next(), fake.Track(1))
p.add_track(fake.Track(3))
self.assertEqual(p.tracks, [ fake.Track(3), fake.Track(2), fake.Track(1) ])
self.assertEqual(p.next(), fake.Track(2))
self.assertEqual(p.next(), fake.Track(1))
state = p.__getstate__()
self.assertEqual(state["name"], "Previous")
self.assertEqual(state["sort"], "previous")
self.assertEqual(state["current"], 2)
self.assertEqual(state["tracks"], [ ])
self.assertFalse(state["loop"])
self.assertFalse(state["random"])
def test_up_next_tag(self):
u = user.UpNextTag()
self.assertIsInstance(u, tag.Tag)
self.assertFalse(u.loop)
self.assertFalse(u.can_loop())
self.assertIsNone(u.next())
u.tracks = [ fake.Track(1, u), fake.Track(2, u), fake.Track(3, u) ]
self.assertEqual(u.next(), fake.Track(1, u))
self.assertEqual(u.tracks, [ fake.Track(2, u), fake.Track(3, u) ])
self.assertEqual(u.next(), fake.Track(2, u))
self.assertEqual(u.tracks, [ fake.Track(3, u) ])
u.random = True
self.assertEqual(u.next(), fake.Track(3, u))
self.assertEqual(u.tracks, [ ])
self.assertFalse(u.random)
class TestUserTagStore(unittest.TestCase):
def test_user_init(self):
store = user.UserTagStore()
self.assertIsInstance(store, tagstore.TagStore)
self.assertIsInstance(store["Collection"], user.CollectionTag)
self.assertIsInstance(store["New Tracks"], user.NewTracksTag)
self.assertIsInstance(store["Previous"], user.PreviousTag)
self.assertIsInstance(store["Favorites"], tag.Tag)
self.assertIsInstance(store["Up Next"], user.UpNextTag)
def test_user_reset(self):
store = user.UserTagStore()
store.add("Playlist")
self.assertEqual(len(store), 6)
store.reset()
self.assertEqual(len(store), 5)
self.assertIsInstance(store["Collection"], user.CollectionTag)
self.assertIsInstance(store["New Tracks"], user.NewTracksTag)
self.assertIsInstance(store["Previous"], user.PreviousTag)
self.assertIsInstance(store["Favorites"], tag.Tag)
self.assertIsInstance(store["Up Next"], user.UpNextTag)

View File

@ -1,98 +0,0 @@
# Copyright 2020 (c) Anna Schumaker.
from lib import metadata
from lib import publisher
from . import tags
from gi.repository import GObject
import datetime
import db
import scanner
class Track(GObject.Object):
def __init__(self, trackid, filepath, library):
GObject.Object.__init__(self)
self.trackid = trackid
self.path = filepath.relative_to(library.name)
self.library = library
self.lastplayed = None
self.playcount = 0
with metadata.Metadata(filepath) as meta:
self.title = meta.title()
self.length = meta.length()
self.discnumber = meta.discnumber()
self.tracknumber = meta.tracknumber()
self.artist = tags.Artist.add(meta.artist(), self, sort=meta.artistsort())
self.album = tags.Album.add(self.artist, meta.album(), self)
self.genres = [ tags.Genre.add(g, self) for g in meta.genres() ]
self.decade = tags.Decade.add(f"{meta.decade()}s", self)
self.year = tags.Year.add(self.decade, str(meta.year()), self)
self.playlists = [ tags.User.add("Collection", self),
tags.User.add("New Tracks", self) ]
def __getitem__(self, item):
tag = self.__dict__.get(item, None)
if item == "length":
(m, s) = divmod(tag, 60)
return f"{m}:{s:02}"
elif item == "discnumber":
return f"{tag:02}"
elif item == "tracknumber":
return f"{self.discnumber}-{tag:02}"
elif item == "lastplayed":
return "Never" if tag == None else str(tag)
elif item == "genres":
return ", ".join([ str(g) for g in self.genres ])
return None if tag == None else str(tag)
def __getstate__(self):
state = self.__dict__.copy()
state["artist"] = str(self.artist)
state["album"] = str(self.album)
state["genres"] = [ str(g) for g in self.genres ]
state["decade"] = str(self.decade)
state["year"] = str(self.year)
state["playlists" ] = [ str(p) for p in self.playlists \
if str(p) not in ("New Tracks", "Previous") ]
return state
def __setstate__(self, state):
GObject.Object.__init__(self)
self.__dict__.update(state)
def __set_tags__(self):
self.artist = tags.Artist.init_track(self.artist, self)
self.album = tags.Album.init_track(self.artist, self.album, self)
self.genres = [ tags.Genre.init_track(g, self) for g in self.genres ]
self.decade = tags.Decade.init_track(self.decade, self)
self.year = tags.Year.init_track(self.decade, self.year, self)
self.playlists = [ tags.User.init_track(p, self) for p in self.playlists ]
scanner.import_track(db.library.Table.find(self.library.name),
self.filepath(), self.playcount, self.lastplayed,
[ p.name for p in self.playlists ])
def about_to_remove(self):
tags.Artist.remove(self.artist, self)
tags.Album.remove(self.album, self)
for genre in self.genres:
tags.Genre.remove(genre, self)
tags.Decade.remove(self.decade, self)
tags.Year.remove(self.year, self)
for tag in self.playlists:
tag.remove_track(self)
def add_to_playlist(self, name):
self.playlists.append(tags.User.add(name, self))
def filepath(self):
return self.library.name / self.path
def played(self):
self.playcount += 1
self.lastplayed = datetime.datetime.now()
def remove_from_playlist(self, name):
tag = tags.User[name]
tag.remove_track(self)
self.playlists.remove(tag)

View File

@ -1,70 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
from lib import tag
from lib import tagstore
class CollectionTag(tag.Tag):
def __init__(self):
tag.Tag.__init__(self, "Collection")
self.loop = True
def can_loop(self): return False
class NewTracksTag(tag.Tag):
def __init__(self):
tag.Tag.__init__(self, "New Tracks")
def __getstate__(self):
state = super().__getstate__()
state["tracks"].clear()
return state
class PreviousTag(tag.Tag):
def __init__(self):
tag.Tag.__init__(self, "Previous")
def __getstate__(self):
state = super().__getstate__()
state["tracks"].clear()
return state
def can_random(self): return False
def can_loop(self): return False
def add_track(self, track):
with self.lock:
self.tracks.insert(0, track)
self.current = 0
self.TrackAdded.publish(self, track, 0)
class UpNextTag(tag.Tag):
def __init__(self):
tag.Tag.__init__(self, "Up Next")
def can_loop(self): return False
def next(self):
track = super().next()
if track is not None:
track.remove_from_playlist("Up Next")
with self.lock:
self.current -= 1
if len(self.tracks) == 0:
self.random = False
return track
class UserTagStore(tagstore.TagStore):
def __init__(self):
tagstore.TagStore.__init__(self)
self.reset()
def reset(self):
super().reset()
self.store["Collection"] = CollectionTag()
self.store["Favorites"] = tag.Tag("Favorites")
self.store["New Tracks"] = NewTracksTag()
self.store["Previous"] = PreviousTag()
self.store["Up Next"] = UpNextTag()

View File

@ -1,24 +1,3 @@
# Copyright 2021 (c) Anna Schumaker.
from . import icons
from . import window
from gi.repository import Gtk
import audio
import db
class EmmentalApplication(Gtk.Application):
def __init__(self, *args, **kwargs):
Gtk.Application.__init__(self, *args, application_id="org.gtk.emmental", **kwargs)
def do_activate(self):
self.window.present()
def do_startup(self):
self.window = window.Window()
Gtk.Application.do_startup(self)
self.add_window(self.window)
def do_shutdown(self):
db.sql.optimize()
Gtk.Application.do_shutdown(self)
Application = EmmentalApplication()

View File

@ -1,12 +1,11 @@
# Copyright 2021 (c) Anna Schumaker.
from . import window
from lib import version
from gi.repository import Gtk
import pathlib
IconPath = pathlib.Path("data/").absolute()
if version.DEBUG == True:
if __debug__ == True:
Display = Gtk.Label().get_display()
Theme = Gtk.IconTheme.get_for_display(Display)

View File

@ -1,7 +1,6 @@
# Copyright 2021 (c) Anna Schumaker.
from gi.repository import Gtk, Gdk
import audio
import tagdb
Event = Gtk.EventControllerKey()
Event.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)

View File

@ -1,9 +0,0 @@
# Copyright 2021 (c) Anna Schumaker.
import unittest
import ui
from gi.repository import Gtk
class TestEmmentalApplication(unittest.TestCase):
def test_application(self):
app = ui.EmmentalApplication()
self.assertIsInstance(app, Gtk.Application)