Compare commits
8 Commits
emmental-3
...
main
Author | SHA1 | Date | |
---|---|---|---|
d3fdf82a93 | |||
28ee637b0a | |||
270be37848 | |||
a0f240a2ad | |||
6dfa841cbd | |||
745301997e | |||
37c74ed5fb | |||
c3818a2b18 |
2
aur
2
aur
|
@ -1 +1 @@
|
||||||
Subproject commit 0965b4bb9626c7ad1463628646cf734b15e16c5d
|
Subproject commit 543220f35990f5b5a69eac5a6c06e69eae6ffb82
|
|
@ -22,7 +22,7 @@ from gi.repository import Adw
|
||||||
|
|
||||||
MAJOR_VERSION = 3
|
MAJOR_VERSION = 3
|
||||||
MINOR_VERSION = 2
|
MINOR_VERSION = 2
|
||||||
MICRO_VERSION = 0
|
MICRO_VERSION = 1
|
||||||
|
|
||||||
VERSION_NUMBER = f"{MAJOR_VERSION}.{MINOR_VERSION}.{MICRO_VERSION}"
|
VERSION_NUMBER = f"{MAJOR_VERSION}.{MINOR_VERSION}.{MICRO_VERSION}"
|
||||||
VERSION_STRING = f"Emmental {VERSION_NUMBER}{gsetup.DEBUG_STR}"
|
VERSION_STRING = f"Emmental {VERSION_NUMBER}{gsetup.DEBUG_STR}"
|
||||||
|
|
|
@ -5,6 +5,7 @@ from gi.repository import GObject
|
||||||
from gi.repository import GLib
|
from gi.repository import GLib
|
||||||
from gi.repository import Gst
|
from gi.repository import Gst
|
||||||
from . import filter
|
from . import filter
|
||||||
|
from . import stopwatch
|
||||||
from .. import path
|
from .. import path
|
||||||
from .. import tmpdir
|
from .. import tmpdir
|
||||||
|
|
||||||
|
@ -32,8 +33,7 @@ class Player(GObject.GObject):
|
||||||
status = GObject.Property(type=str, default="Stopped")
|
status = GObject.Property(type=str, default="Stopped")
|
||||||
have_track = GObject.Property(type=bool, default=False)
|
have_track = GObject.Property(type=bool, default=False)
|
||||||
almost_done = GObject.Property(type=bool, default=False)
|
almost_done = GObject.Property(type=bool, default=False)
|
||||||
playtime = GObject.Property(type=float)
|
stopwatch = GObject.Property(type=stopwatch.StopWatch)
|
||||||
savedtime = GObject.Property(type=float)
|
|
||||||
|
|
||||||
bg_enabled = GObject.Property(type=bool, default=False)
|
bg_enabled = GObject.Property(type=bool, default=False)
|
||||||
bg_volume = GObject.Property(type=float, default=0.5)
|
bg_volume = GObject.Property(type=float, default=0.5)
|
||||||
|
@ -41,7 +41,7 @@ class Player(GObject.GObject):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
"""Initialize the audio Player."""
|
"""Initialize the audio Player."""
|
||||||
super().__init__()
|
super().__init__(stopwatch=stopwatch.StopWatch())
|
||||||
self._filter = filter.Filter()
|
self._filter = filter.Filter()
|
||||||
self._timeout = None
|
self._timeout = None
|
||||||
|
|
||||||
|
@ -70,12 +70,6 @@ class Player(GObject.GObject):
|
||||||
if not self.almost_done:
|
if not self.almost_done:
|
||||||
self.emit("about-to-finish")
|
self.emit("about-to-finish")
|
||||||
|
|
||||||
def __get_current_playtime(self) -> float:
|
|
||||||
if not self._playbin.clock:
|
|
||||||
return 0.0
|
|
||||||
time = self._playbin.clock.get_time() - self._playbin.base_time
|
|
||||||
return time / Gst.SECOND
|
|
||||||
|
|
||||||
def __msg_async_done(self, bus: Gst.Bus, message: Gst.Message) -> None:
|
def __msg_async_done(self, bus: Gst.Bus, message: Gst.Message) -> None:
|
||||||
self.__update_position()
|
self.__update_position()
|
||||||
|
|
||||||
|
@ -95,15 +89,18 @@ class Player(GObject.GObject):
|
||||||
print("audio: state changed to 'playing'")
|
print("audio: state changed to 'playing'")
|
||||||
self.status = "Playing"
|
self.status = "Playing"
|
||||||
self.playing = True
|
self.playing = True
|
||||||
|
self.stopwatch.start()
|
||||||
case (_, Gst.State.PAUSED, Gst.State.VOID_PENDING):
|
case (_, Gst.State.PAUSED, Gst.State.VOID_PENDING):
|
||||||
print("audio: state changed to 'paused'")
|
print("audio: state changed to 'paused'")
|
||||||
self.status = "Paused"
|
self.status = "Paused"
|
||||||
self.playing = False
|
self.playing = False
|
||||||
|
self.stopwatch.stop()
|
||||||
case (_, Gst.State.READY, Gst.State.VOID_PENDING) | \
|
case (_, Gst.State.READY, Gst.State.VOID_PENDING) | \
|
||||||
(_, Gst.State.NULL, Gst.State.VOID_PENDING):
|
(_, Gst.State.NULL, Gst.State.VOID_PENDING):
|
||||||
print("audio: state changed to 'stopped'")
|
print("audio: state changed to 'stopped'")
|
||||||
self.status = "Stopped"
|
self.status = "Stopped"
|
||||||
self.playing = False
|
self.playing = False
|
||||||
|
self.stopwatch.stop()
|
||||||
|
|
||||||
self.__update_timeout()
|
self.__update_timeout()
|
||||||
|
|
||||||
|
@ -141,8 +138,7 @@ class Player(GObject.GObject):
|
||||||
artwork: pathlib.Path | None = None) -> None:
|
artwork: pathlib.Path | None = None) -> None:
|
||||||
for tag in ["artist", "album-artist", "album", "title"]:
|
for tag in ["artist", "album-artist", "album", "title"]:
|
||||||
self.set_property(tag, "")
|
self.set_property(tag, "")
|
||||||
for tag in ["album-disc-number", "track-number",
|
for tag in ["album-disc-number", "track-number", "position"]:
|
||||||
"position", "playtime", "savedtime"]:
|
|
||||||
self.set_property(tag, 0)
|
self.set_property(tag, 0)
|
||||||
|
|
||||||
self.almost_done = False
|
self.almost_done = False
|
||||||
|
@ -153,7 +149,6 @@ class Player(GObject.GObject):
|
||||||
def __update_position(self) -> bool:
|
def __update_position(self) -> bool:
|
||||||
(res, pos) = self._playbin.query_position(Gst.Format.TIME)
|
(res, pos) = self._playbin.query_position(Gst.Format.TIME)
|
||||||
self.position = pos / Gst.USECOND if res else 0
|
self.position = pos / Gst.USECOND if res else 0
|
||||||
self.playtime = self.__get_current_playtime() + self.savedtime
|
|
||||||
self.__check_last_second()
|
self.__check_last_second()
|
||||||
return GLib.SOURCE_CONTINUE
|
return GLib.SOURCE_CONTINUE
|
||||||
|
|
||||||
|
@ -189,7 +184,6 @@ class Player(GObject.GObject):
|
||||||
|
|
||||||
def seek(self, newpos: float, *args) -> None:
|
def seek(self, newpos: float, *args) -> None:
|
||||||
"""Seek to a different point in the stream."""
|
"""Seek to a different point in the stream."""
|
||||||
self.savedtime += self.__get_current_playtime()
|
|
||||||
self._playbin.seek_simple(Gst.Format.TIME, SEEK_FLAGS,
|
self._playbin.seek_simple(Gst.Format.TIME, SEEK_FLAGS,
|
||||||
newpos * Gst.USECOND)
|
newpos * Gst.USECOND)
|
||||||
|
|
||||||
|
@ -210,6 +204,11 @@ class Player(GObject.GObject):
|
||||||
"""Stop playback."""
|
"""Stop playback."""
|
||||||
self.set_state_sync(Gst.State.READY)
|
self.set_state_sync(Gst.State.READY)
|
||||||
|
|
||||||
|
@GObject.Property(type=float)
|
||||||
|
def playtime(self) -> float:
|
||||||
|
"""Get the total playtime of the current track."""
|
||||||
|
return self.stopwatch.elapsed_time()
|
||||||
|
|
||||||
@GObject.Signal
|
@GObject.Signal
|
||||||
def about_to_finish(self) -> None:
|
def about_to_finish(self) -> None:
|
||||||
"""Signal that playback is almost done."""
|
"""Signal that playback is almost done."""
|
||||||
|
@ -226,6 +225,7 @@ class Player(GObject.GObject):
|
||||||
cover = self.file.parent / "cover.jpg"
|
cover = self.file.parent / "cover.jpg"
|
||||||
self.__reset_properties(duration=(dur / Gst.USECOND if res else 0),
|
self.__reset_properties(duration=(dur / Gst.USECOND if res else 0),
|
||||||
artwork=(cover if cover.is_file() else None))
|
artwork=(cover if cover.is_file() else None))
|
||||||
|
self.stopwatch.reset(playing=self.playing)
|
||||||
self.have_track = True
|
self.have_track = True
|
||||||
|
|
||||||
@GObject.Signal
|
@GObject.Signal
|
||||||
|
|
42
emmental/audio/stopwatch.py
Normal file
42
emmental/audio/stopwatch.py
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
# Copyright 2024 (c) Anna Schumaker.
|
||||||
|
"""A custom StopWatch object for tracking play time."""
|
||||||
|
import datetime
|
||||||
|
from gi.repository import GObject
|
||||||
|
|
||||||
|
|
||||||
|
class StopWatch(GObject.GObject):
|
||||||
|
"""A StopWatch object."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
"""Initialize the StopWatch."""
|
||||||
|
super().__init__()
|
||||||
|
self._saved = None
|
||||||
|
self._started = None
|
||||||
|
|
||||||
|
def elapsed_time(self) -> float:
|
||||||
|
"""Get the elapsed time (in seconds)."""
|
||||||
|
total = datetime.timedelta()
|
||||||
|
if self._saved is not None:
|
||||||
|
total += self._saved
|
||||||
|
if self._started is not None:
|
||||||
|
total += datetime.datetime.now() - self._started
|
||||||
|
return total.total_seconds()
|
||||||
|
|
||||||
|
def reset(self, *, playing: bool) -> None:
|
||||||
|
"""Reset the StopWatch."""
|
||||||
|
self._saved = None
|
||||||
|
self._started = datetime.datetime.now() if playing else None
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
"""Start the StopWatch."""
|
||||||
|
self._started = datetime.datetime.now()
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
"""Stop the StopWatch."""
|
||||||
|
if self._started is not None:
|
||||||
|
delta = datetime.datetime.now() - self._started
|
||||||
|
if self._saved is None:
|
||||||
|
self._saved = delta
|
||||||
|
else:
|
||||||
|
self._saved += delta
|
||||||
|
self._started = None
|
50
emmental/db/adapter.py
Normal file
50
emmental/db/adapter.py
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
# Copyright 2024 (c) Anna Schumaker.
|
||||||
|
"""Custom sqlite3 adapters."""
|
||||||
|
import datetime
|
||||||
|
import pathlib
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
|
||||||
|
def adapt_date(date: datetime.date) -> str:
|
||||||
|
"""Adapt a date object to IOS 8601 date."""
|
||||||
|
return date.isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
def adapt_datetime(dt: datetime.datetime) -> str:
|
||||||
|
"""Adapt a datetime object to ISO 8601 timestamp."""
|
||||||
|
if dt.tzinfo is not None:
|
||||||
|
dt = datetime.datetime.combine(dt.date(), dt.time())
|
||||||
|
return dt.isoformat(" ")
|
||||||
|
|
||||||
|
|
||||||
|
def adapt_path(path: pathlib.Path) -> str:
|
||||||
|
"""Adapt a pathlib.Path into a sqlite3 string."""
|
||||||
|
return str(path)
|
||||||
|
|
||||||
|
|
||||||
|
def convert_date(date: bytes) -> datetime.date:
|
||||||
|
"""Convert ISO 8601 date to a datetime.date."""
|
||||||
|
return datetime.date.fromisoformat(date.decode())
|
||||||
|
|
||||||
|
|
||||||
|
def convert_datetime(dt: bytes) -> datetime.datetime:
|
||||||
|
"""Convert ISO 8601 timestamp to a datetime.datetime."""
|
||||||
|
if (res := datetime.datetime.fromisoformat(dt.decode())).tzinfo is None:
|
||||||
|
res = datetime.datetime.combine(res.date(), res.time(), datetime.UTC)
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
def convert_path(path: bytes) -> pathlib.Path:
|
||||||
|
"""Convert a path string into a pathlib.Path object."""
|
||||||
|
return pathlib.Path(path.decode())
|
||||||
|
|
||||||
|
|
||||||
|
def register() -> None:
|
||||||
|
"""Register our adapters and converters."""
|
||||||
|
sqlite3.register_adapter(datetime.date, adapt_date)
|
||||||
|
sqlite3.register_adapter(datetime.datetime, adapt_datetime)
|
||||||
|
sqlite3.register_adapter(pathlib.PosixPath, adapt_path)
|
||||||
|
|
||||||
|
sqlite3.register_converter("date", convert_date)
|
||||||
|
sqlite3.register_converter("timestamp", convert_datetime)
|
||||||
|
sqlite3.register_converter("path", convert_path)
|
|
@ -4,25 +4,14 @@ import pathlib
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import sys
|
import sys
|
||||||
from gi.repository import GObject
|
from gi.repository import GObject
|
||||||
|
from . import adapter
|
||||||
from .. import gsetup
|
from .. import gsetup
|
||||||
|
|
||||||
|
|
||||||
DATA_FILE = gsetup.DATA_DIR / f"emmental{gsetup.DEBUG_STR}.sqlite3"
|
DATA_FILE = gsetup.DATA_DIR / f"emmental{gsetup.DEBUG_STR}.sqlite3"
|
||||||
DATABASE = ":memory:" if "unittest" in sys.modules else DATA_FILE
|
DATABASE = ":memory:" if "unittest" in sys.modules else DATA_FILE
|
||||||
|
|
||||||
|
adapter.register()
|
||||||
def adapt_path(path: pathlib.Path) -> str:
|
|
||||||
"""Adapt a pathlib.Path into a sqlite3 string."""
|
|
||||||
return str(path)
|
|
||||||
|
|
||||||
|
|
||||||
def convert_path(path: bytes) -> pathlib.Path:
|
|
||||||
"""Convert a path string into a pathlib.Path object."""
|
|
||||||
return pathlib.Path(path.decode())
|
|
||||||
|
|
||||||
|
|
||||||
sqlite3.register_adapter(pathlib.PosixPath, adapt_path)
|
|
||||||
sqlite3.register_converter("path", convert_path)
|
|
||||||
|
|
||||||
|
|
||||||
class Connection(GObject.GObject):
|
class Connection(GObject.GObject):
|
||||||
|
|
|
@ -242,7 +242,7 @@ class Table(table.Table):
|
||||||
def restart_track(self, track: Track) -> None:
|
def restart_track(self, track: Track) -> None:
|
||||||
"""Mark that a Track has been restarted."""
|
"""Mark that a Track has been restarted."""
|
||||||
track.active = True
|
track.active = True
|
||||||
track.restarted = datetime.datetime.utcnow()
|
track.restarted = datetime.datetime.now(datetime.UTC)
|
||||||
self.current_track = track
|
self.current_track = track
|
||||||
|
|
||||||
def start_track(self, track: Track) -> None:
|
def start_track(self, track: Track) -> None:
|
||||||
|
@ -251,7 +251,7 @@ class Table(table.Table):
|
||||||
|
|
||||||
cur = self.sql("""UPDATE tracks SET active=TRUE, laststarted=?
|
cur = self.sql("""UPDATE tracks SET active=TRUE, laststarted=?
|
||||||
WHERE trackid=? RETURNING laststarted""",
|
WHERE trackid=? RETURNING laststarted""",
|
||||||
datetime.datetime.utcnow(), track.trackid)
|
datetime.datetime.now(datetime.UTC), track.trackid)
|
||||||
track.active = True
|
track.active = True
|
||||||
track.laststarted = cur.fetchone()["laststarted"]
|
track.laststarted = cur.fetchone()["laststarted"]
|
||||||
self.current_track = track
|
self.current_track = track
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
# Copyright 2022 (c) Anna Schumaker.
|
# Copyright 2022 (c) Anna Schumaker.
|
||||||
"""Tests our GObject audio player wrapping a GStreamer Playbin element."""
|
"""Tests our GObject audio player wrapping a GStreamer Playbin element."""
|
||||||
|
import datetime
|
||||||
import io
|
import io
|
||||||
import pathlib
|
import pathlib
|
||||||
import unittest
|
import unittest
|
||||||
|
@ -80,10 +81,10 @@ class TestAudio(unittest.TestCase):
|
||||||
self.player.file = tests.util.TRACK_OGG
|
self.player.file = tests.util.TRACK_OGG
|
||||||
self.player.duration = 10
|
self.player.duration = 10
|
||||||
self.player.position = 8
|
self.player.position = 8
|
||||||
self.player.playtime = 6
|
|
||||||
self.player.savedtime = 4
|
|
||||||
self.player.almost_done = True
|
self.player.almost_done = True
|
||||||
self.player.artwork = pathlib.Path("/a/b/c.jpg")
|
self.player.artwork = pathlib.Path("/a/b/c.jpg")
|
||||||
|
self.player.stopwatch.reset = unittest.mock.Mock()
|
||||||
|
self.player.stopwatch._saved = datetime.timedelta(seconds=6)
|
||||||
|
|
||||||
eos = Gst.Message.new_eos(self.player._playbin)
|
eos = Gst.Message.new_eos(self.player._playbin)
|
||||||
self.player._playbin.get_bus().post(eos)
|
self.player._playbin.get_bus().post(eos)
|
||||||
|
@ -94,9 +95,11 @@ class TestAudio(unittest.TestCase):
|
||||||
for prop in ["artist", "album-artist", "album", "title"]:
|
for prop in ["artist", "album-artist", "album", "title"]:
|
||||||
self.assertEqual(self.player.get_property(prop), "")
|
self.assertEqual(self.player.get_property(prop), "")
|
||||||
for prop in ["album-disc-number", "track-number",
|
for prop in ["album-disc-number", "track-number",
|
||||||
"position", "duration", "playtime", "savedtime"]:
|
"position", "duration"]:
|
||||||
self.assertEqual(self.player.get_property(prop), 0)
|
self.assertEqual(self.player.get_property(prop), 0)
|
||||||
self.assertIsNone(self.player.artwork)
|
self.assertIsNone(self.player.artwork)
|
||||||
|
self.player.stopwatch.reset.assert_not_called()
|
||||||
|
self.assertEqual(self.player.playtime, 6)
|
||||||
|
|
||||||
self.assertEqual(self.player.get_state(), Gst.State.READY)
|
self.assertEqual(self.player.get_state(), Gst.State.READY)
|
||||||
self.assertEqual(self.player.status, "Stopped")
|
self.assertEqual(self.player.status, "Stopped")
|
||||||
|
@ -150,22 +153,28 @@ class TestAudio(unittest.TestCase):
|
||||||
"audio: file loaded\n"
|
"audio: file loaded\n"
|
||||||
"audio: state changed to 'paused'\n")
|
"audio: state changed to 'paused'\n")
|
||||||
|
|
||||||
self.player.playtime = 6
|
self.player.stopwatch._saved = datetime.timedelta(seconds=6)
|
||||||
self.player.savedtime = 4
|
|
||||||
self.player.emit("file-loaded", tests.util.TRACK_OGG)
|
self.player.emit("file-loaded", tests.util.TRACK_OGG)
|
||||||
for prop in ["artist", "album-artist", "album", "title"]:
|
for prop in ["artist", "album-artist", "album", "title"]:
|
||||||
self.assertEqual(self.player.get_property(prop), "")
|
self.assertEqual(self.player.get_property(prop), "")
|
||||||
for prop in ["album-disc-number", "track-number",
|
for prop in ["album-disc-number", "track-number", "playtime"]:
|
||||||
"playtime", "savedtime"]:
|
|
||||||
self.assertEqual(self.player.get_property(prop), 0)
|
self.assertEqual(self.player.get_property(prop), 0)
|
||||||
|
self.assertIsNone(self.player.stopwatch._started)
|
||||||
|
|
||||||
|
self.player.playing = True
|
||||||
|
self.player.emit("file-loaded", tests.util.TRACK_OGG)
|
||||||
|
self.assertIsNotNone(self.player.stopwatch._started)
|
||||||
|
|
||||||
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
||||||
def test_play(self, mock_stdout: io.StringIO):
|
def test_play(self, mock_stdout: io.StringIO):
|
||||||
"""Test that the play() function works as expected."""
|
"""Test that the play() function works as expected."""
|
||||||
|
self.player.stopwatch.start = unittest.mock.Mock()
|
||||||
|
|
||||||
self.player.play()
|
self.player.play()
|
||||||
self.main_loop()
|
self.main_loop()
|
||||||
self.assertFalse(self.player.playing)
|
self.assertFalse(self.player.playing)
|
||||||
self.assertEqual(self.player.status, "Stopped")
|
self.assertEqual(self.player.status, "Stopped")
|
||||||
|
self.player.stopwatch.start.assert_not_called()
|
||||||
|
|
||||||
self.player.file = tests.util.TRACK_OGG
|
self.player.file = tests.util.TRACK_OGG
|
||||||
self.player.play()
|
self.player.play()
|
||||||
|
@ -175,13 +184,17 @@ class TestAudio(unittest.TestCase):
|
||||||
self.assertEqual(self.player.get_state(), Gst.State.PLAYING)
|
self.assertEqual(self.player.get_state(), Gst.State.PLAYING)
|
||||||
self.assertRegex(mock_stdout.getvalue(),
|
self.assertRegex(mock_stdout.getvalue(),
|
||||||
"audio: state changed to 'playing'$")
|
"audio: state changed to 'playing'$")
|
||||||
|
self.player.stopwatch.start.assert_called()
|
||||||
|
|
||||||
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
||||||
def test_pause(self, mock_stdout: io.StringIO):
|
def test_pause(self, mock_stdout: io.StringIO):
|
||||||
"""Test that the pause() function works as expected."""
|
"""Test that the pause() function works as expected."""
|
||||||
|
self.player.stopwatch.stop = unittest.mock.Mock()
|
||||||
|
|
||||||
self.player.pause()
|
self.player.pause()
|
||||||
self.main_loop()
|
self.main_loop()
|
||||||
self.assertEqual(self.player.status, "Stopped")
|
self.assertEqual(self.player.status, "Stopped")
|
||||||
|
self.player.stopwatch.stop.assert_not_called()
|
||||||
|
|
||||||
self.player.playing = True
|
self.player.playing = True
|
||||||
self.player.file = tests.util.TRACK_OGG
|
self.player.file = tests.util.TRACK_OGG
|
||||||
|
@ -192,6 +205,7 @@ class TestAudio(unittest.TestCase):
|
||||||
self.assertEqual(self.player.get_state(), Gst.State.PAUSED)
|
self.assertEqual(self.player.get_state(), Gst.State.PAUSED)
|
||||||
self.assertRegex(mock_stdout.getvalue(),
|
self.assertRegex(mock_stdout.getvalue(),
|
||||||
"audio: state changed to 'paused'$")
|
"audio: state changed to 'paused'$")
|
||||||
|
self.player.stopwatch.stop.assert_called()
|
||||||
|
|
||||||
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
||||||
def test_play_pause(self, mock_stdout: io.StringIO):
|
def test_play_pause(self, mock_stdout: io.StringIO):
|
||||||
|
@ -222,6 +236,7 @@ class TestAudio(unittest.TestCase):
|
||||||
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
||||||
def test_stop(self, mock_stdout: io.StringIO):
|
def test_stop(self, mock_stdout: io.StringIO):
|
||||||
"""Test that the stop() function works as expected."""
|
"""Test that the stop() function works as expected."""
|
||||||
|
self.player.stopwatch.stop = unittest.mock.Mock()
|
||||||
self.player.file = tests.util.TRACK_OGG
|
self.player.file = tests.util.TRACK_OGG
|
||||||
self.player.play()
|
self.player.play()
|
||||||
self.main_loop()
|
self.main_loop()
|
||||||
|
@ -234,6 +249,7 @@ class TestAudio(unittest.TestCase):
|
||||||
self.assertRegex(mock_stdout.getvalue(),
|
self.assertRegex(mock_stdout.getvalue(),
|
||||||
"audio: state changed to 'playing'\n"
|
"audio: state changed to 'playing'\n"
|
||||||
"audio: state changed to 'stopped'\n$")
|
"audio: state changed to 'stopped'\n$")
|
||||||
|
self.player.stopwatch.stop.assert_called()
|
||||||
|
|
||||||
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
||||||
def test_tags(self, mock_stdout: io.StringIO):
|
def test_tags(self, mock_stdout: io.StringIO):
|
||||||
|
@ -282,30 +298,14 @@ class TestAudio(unittest.TestCase):
|
||||||
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
|
||||||
def test_playtime(self, mock_stdout: io.StringIO):
|
def test_playtime(self, mock_stdout: io.StringIO):
|
||||||
"""Test the play time property."""
|
"""Test the play time property."""
|
||||||
|
self.assertIsInstance(self.player.stopwatch,
|
||||||
|
emmental.audio.stopwatch.StopWatch)
|
||||||
self.assertEqual(self.player.playtime, 0.0)
|
self.assertEqual(self.player.playtime, 0.0)
|
||||||
self.assertEqual(self.player.savedtime, 0.0)
|
|
||||||
self.assertEqual(self.player._Player__get_current_playtime(), 0.0)
|
|
||||||
|
|
||||||
self.player.file = tests.util.TRACK_OGG
|
with unittest.mock.patch.object(self.player.stopwatch,
|
||||||
self.player.play()
|
"elapsed_time") as mock_elapsed:
|
||||||
self.main_loop()
|
mock_elapsed.return_value = 2.0
|
||||||
self.assertIsNotNone(self.player._playbin.clock)
|
self.assertEqual(self.player.playtime, 2.0)
|
||||||
|
|
||||||
base_time = unittest.mock.PropertyMock(return_value=Gst.SECOND)
|
|
||||||
type(self.player._playbin).base_time = base_time
|
|
||||||
get_time = unittest.mock.Mock(return_value=3 * Gst.SECOND)
|
|
||||||
self.player._playbin.clock.get_time = get_time
|
|
||||||
|
|
||||||
self.assertEqual(self.player._Player__get_current_playtime(), 2.0)
|
|
||||||
self.player._Player__update_position()
|
|
||||||
self.assertEqual(self.player.playtime, 2)
|
|
||||||
|
|
||||||
with unittest.mock.patch.object(self.player._playbin, "seek_simple"):
|
|
||||||
self.player.seek(5)
|
|
||||||
self.assertEqual(self.player.savedtime, 2)
|
|
||||||
|
|
||||||
self.player._Player__update_position()
|
|
||||||
self.assertEqual(self.player.playtime, 4)
|
|
||||||
|
|
||||||
def test_volume(self):
|
def test_volume(self):
|
||||||
"""Test that the volume property works as expected."""
|
"""Test that the volume property works as expected."""
|
||||||
|
|
91
tests/audio/test_stopwatch.py
Normal file
91
tests/audio/test_stopwatch.py
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
# Copyright 2024 (c) Anna Schumaker.
|
||||||
|
"""Tests our StopWatch object."""
|
||||||
|
import datetime
|
||||||
|
import emmental.audio.stopwatch
|
||||||
|
import unittest
|
||||||
|
from gi.repository import GObject
|
||||||
|
|
||||||
|
|
||||||
|
@unittest.mock.patch.object(emmental.audio.stopwatch, "datetime")
|
||||||
|
class TestStopwatch(unittest.TestCase):
|
||||||
|
"""Our stopwatch test case."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Set up common variables."""
|
||||||
|
self.now = datetime.datetime.now()
|
||||||
|
self.newdelta = datetime.timedelta
|
||||||
|
self.stopwatch = emmental.audio.stopwatch.StopWatch()
|
||||||
|
|
||||||
|
def test_init(self, mock_datetime: unittest.mock.Mock):
|
||||||
|
"""Test that the StopWatch was created properly."""
|
||||||
|
self.assertIsInstance(self.stopwatch, GObject.GObject)
|
||||||
|
|
||||||
|
def test_elapsed_time(self, mock_datetime: unittest.mock.Mock):
|
||||||
|
"""Test the elapsed_time() function."""
|
||||||
|
mock_datetime.timedelta = self.newdelta
|
||||||
|
self.assertEqual(self.stopwatch.elapsed_time(), 0.0)
|
||||||
|
|
||||||
|
mock_datetime.datetime.now.return_value = self.now
|
||||||
|
self.stopwatch.start()
|
||||||
|
|
||||||
|
soon = self.now + datetime.timedelta(seconds=12.345)
|
||||||
|
mock_datetime.datetime.now.return_value = soon
|
||||||
|
self.assertEqual(self.stopwatch.elapsed_time(), 12.345)
|
||||||
|
|
||||||
|
self.stopwatch._saved = datetime.timedelta(seconds=2)
|
||||||
|
self.assertEqual(self.stopwatch.elapsed_time(), 14.345)
|
||||||
|
|
||||||
|
self.stopwatch.stop()
|
||||||
|
self.assertEqual(self.stopwatch.elapsed_time(), 14.345)
|
||||||
|
|
||||||
|
self.stopwatch.reset(playing=False)
|
||||||
|
self.assertEqual(self.stopwatch.elapsed_time(), 0.0)
|
||||||
|
|
||||||
|
def test_reset(self, mock_datetime: unittest.mock.Mock):
|
||||||
|
"""Test resetting the StopWatch."""
|
||||||
|
mock_datetime.datetime.now.return_value = self.now
|
||||||
|
self.stopwatch.start()
|
||||||
|
soon = self.now + datetime.timedelta(seconds=12.345)
|
||||||
|
mock_datetime.datetime.now.return_value = soon
|
||||||
|
self.stopwatch.stop()
|
||||||
|
|
||||||
|
self.stopwatch.reset(playing=False)
|
||||||
|
self.assertIsNone(self.stopwatch._saved)
|
||||||
|
self.assertIsNone(self.stopwatch._started)
|
||||||
|
|
||||||
|
self.stopwatch.reset(playing=True)
|
||||||
|
self.assertIsNone(self.stopwatch._saved)
|
||||||
|
self.assertEqual(self.stopwatch._started, soon)
|
||||||
|
|
||||||
|
def test_start(self, mock_datetime: unittest.mock.Mock):
|
||||||
|
"""Test starting the StopWatch."""
|
||||||
|
self.assertIsNone(self.stopwatch._started)
|
||||||
|
|
||||||
|
mock_datetime.datetime.now.return_value = self.now
|
||||||
|
self.stopwatch.start()
|
||||||
|
self.assertEqual(self.stopwatch._started, self.now)
|
||||||
|
|
||||||
|
def test_stop(self, mock_datetime: unittest.mock.Mock):
|
||||||
|
"""Test stopping the StopWatch."""
|
||||||
|
self.assertIsNone(self.stopwatch._saved)
|
||||||
|
self.stopwatch.stop()
|
||||||
|
self.assertIsNone(self.stopwatch._saved)
|
||||||
|
|
||||||
|
mock_datetime.datetime.now.return_value = self.now
|
||||||
|
self.stopwatch.start()
|
||||||
|
delta1 = datetime.timedelta(seconds=12.345)
|
||||||
|
mock_datetime.datetime.now.return_value = self.now + delta1
|
||||||
|
self.stopwatch.stop()
|
||||||
|
self.assertEqual(self.stopwatch._saved, delta1)
|
||||||
|
self.assertIsNone(self.stopwatch._started)
|
||||||
|
|
||||||
|
now = self.now + delta1 + datetime.timedelta(seconds=2)
|
||||||
|
mock_datetime.datetime.now.return_value = now
|
||||||
|
self.stopwatch.start()
|
||||||
|
delta2 = datetime.timedelta(seconds=3)
|
||||||
|
mock_datetime.datetime.now.return_value = now + delta2
|
||||||
|
self.stopwatch.stop()
|
||||||
|
self.assertEqual(self.stopwatch._saved, delta1 + delta2)
|
||||||
|
|
||||||
|
self.stopwatch.stop()
|
||||||
|
self.assertEqual(self.stopwatch._saved, delta1 + delta2)
|
60
tests/db/test_adapter.py
Normal file
60
tests/db/test_adapter.py
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
# Copyright 2024 (c) Anna Schumaker.
|
||||||
|
"""Test our custom sqlite3 adapters."""
|
||||||
|
import datetime
|
||||||
|
import emmental.db.adapter
|
||||||
|
import pathlib
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
class TestAdapters(unittest.TestCase):
|
||||||
|
"""Test case for our database adapters."""
|
||||||
|
|
||||||
|
def test_date(self):
|
||||||
|
"""Test adapting and converting a date object."""
|
||||||
|
date = datetime.date(2024, 6, 5)
|
||||||
|
|
||||||
|
self.assertEqual(emmental.db.adapter.adapt_date(date),
|
||||||
|
"2024-06-05")
|
||||||
|
self.assertEqual(emmental.db.adapter.convert_date(b"2024-06-05"),
|
||||||
|
date)
|
||||||
|
|
||||||
|
def test_datetime(self):
|
||||||
|
"""Test adapting and converting a datetime object."""
|
||||||
|
dt_utc = datetime.datetime.now(datetime.UTC)
|
||||||
|
dt_naive = datetime.datetime.combine(dt_utc.date(), dt_utc.time())
|
||||||
|
iso_naive = dt_naive.isoformat(" ").encode()
|
||||||
|
|
||||||
|
self.assertEqual(emmental.db.adapter.adapt_datetime(dt_utc),
|
||||||
|
iso_naive.decode())
|
||||||
|
self.assertEqual(emmental.db.adapter.adapt_datetime(dt_naive),
|
||||||
|
iso_naive.decode())
|
||||||
|
|
||||||
|
self.assertEqual(emmental.db.adapter.convert_datetime(iso_naive),
|
||||||
|
dt_utc)
|
||||||
|
|
||||||
|
def test_path(self):
|
||||||
|
"""Test adapting and converting a pathlib.Path object."""
|
||||||
|
path = pathlib.Path("/my/test/path")
|
||||||
|
self.assertEqual(emmental.db.adapter.adapt_path(path),
|
||||||
|
"/my/test/path")
|
||||||
|
self.assertEqual(emmental.db.adapter.convert_path(b"/my/test/path"),
|
||||||
|
path)
|
||||||
|
|
||||||
|
@unittest.mock.patch("sqlite3.register_converter")
|
||||||
|
@unittest.mock.patch("sqlite3.register_adapter")
|
||||||
|
def test_register(self, register_adapter: unittest.mock.Mock,
|
||||||
|
register_converter: unittest.mock.Mock):
|
||||||
|
"""Test registering adapters and converters."""
|
||||||
|
emmental.db.adapter.register()
|
||||||
|
|
||||||
|
adapters = [(datetime.date, emmental.db.adapter.adapt_date),
|
||||||
|
(datetime.datetime, emmental.db.adapter.adapt_datetime),
|
||||||
|
(pathlib.PosixPath, emmental.db.adapter.adapt_path)]
|
||||||
|
register_adapter.assert_has_calls([unittest.mock.call(*a)
|
||||||
|
for a in adapters])
|
||||||
|
|
||||||
|
converters = [("date", emmental.db.adapter.convert_date),
|
||||||
|
("timestamp", emmental.db.adapter.convert_datetime),
|
||||||
|
("path", emmental.db.adapter.convert_path)]
|
||||||
|
register_converter.assert_has_calls([unittest.mock.call(*c)
|
||||||
|
for c in converters])
|
|
@ -214,6 +214,7 @@ class TestTrackTable(tests.util.TestCase):
|
||||||
|
|
||||||
def test_create(self):
|
def test_create(self):
|
||||||
"""Test creating a new Track."""
|
"""Test creating a new Track."""
|
||||||
|
now = datetime.datetime.now(datetime.UTC)
|
||||||
track = self.tracks.create(self.library, pathlib.Path("/a/b/c.ogg"),
|
track = self.tracks.create(self.library, pathlib.Path("/a/b/c.ogg"),
|
||||||
self.medium, self.year)
|
self.medium, self.year)
|
||||||
self.assertIsInstance(track, emmental.db.tracks.Track)
|
self.assertIsInstance(track, emmental.db.tracks.Track)
|
||||||
|
@ -221,7 +222,7 @@ class TestTrackTable(tests.util.TestCase):
|
||||||
self.assertEqual(track.mediumid, self.medium.mediumid)
|
self.assertEqual(track.mediumid, self.medium.mediumid)
|
||||||
self.assertEqual(track.year, 1988)
|
self.assertEqual(track.year, 1988)
|
||||||
self.assertEqual(track.path, pathlib.Path("/a/b/c.ogg"))
|
self.assertEqual(track.path, pathlib.Path("/a/b/c.ogg"))
|
||||||
self.assertEqual(track.added, datetime.datetime.utcnow().date())
|
self.assertEqual(track.added, now.date())
|
||||||
|
|
||||||
track2 = self.tracks.create(self.library, pathlib.Path("/a/b/d.ogg"),
|
track2 = self.tracks.create(self.library, pathlib.Path("/a/b/d.ogg"),
|
||||||
self.medium, self.year, title="Test Track",
|
self.medium, self.year, title="Test Track",
|
||||||
|
@ -247,7 +248,7 @@ class TestTrackTable(tests.util.TestCase):
|
||||||
|
|
||||||
def test_create_restore(self):
|
def test_create_restore(self):
|
||||||
"""Test restoring saved track data."""
|
"""Test restoring saved track data."""
|
||||||
now = datetime.datetime.utcnow()
|
now = datetime.datetime.now(datetime.UTC)
|
||||||
today = now.date()
|
today = now.date()
|
||||||
yesterday = today - datetime.timedelta(days=1)
|
yesterday = today - datetime.timedelta(days=1)
|
||||||
self.sql("""INSERT INTO saved_track_data
|
self.sql("""INSERT INTO saved_track_data
|
||||||
|
@ -308,7 +309,7 @@ class TestTrackTable(tests.util.TestCase):
|
||||||
|
|
||||||
def test_delete_save(self):
|
def test_delete_save(self):
|
||||||
"""Test saving track data when a track is deleted."""
|
"""Test saving track data when a track is deleted."""
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now(datetime.UTC)
|
||||||
track1 = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
|
track1 = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
|
||||||
self.medium, self.year, mbid="ab-cd-ef")
|
self.medium, self.year, mbid="ab-cd-ef")
|
||||||
track2 = self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
|
track2 = self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
|
||||||
|
@ -328,7 +329,7 @@ class TestTrackTable(tests.util.TestCase):
|
||||||
self.assertEqual(rows[0]["laststarted"], now)
|
self.assertEqual(rows[0]["laststarted"], now)
|
||||||
self.assertEqual(rows[0]["lastplayed"], now)
|
self.assertEqual(rows[0]["lastplayed"], now)
|
||||||
self.assertEqual(rows[0]["playcount"], 42)
|
self.assertEqual(rows[0]["playcount"], 42)
|
||||||
self.assertEqual(rows[0]["added"], datetime.datetime.utcnow().date())
|
self.assertEqual(rows[0]["added"], now.date())
|
||||||
|
|
||||||
def test_filter(self):
|
def test_filter(self):
|
||||||
"""Test filtering the Track table."""
|
"""Test filtering the Track table."""
|
||||||
|
@ -354,7 +355,7 @@ class TestTrackTable(tests.util.TestCase):
|
||||||
|
|
||||||
def test_load(self):
|
def test_load(self):
|
||||||
"""Test loading tracks from the database."""
|
"""Test loading tracks from the database."""
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now(datetime.UTC)
|
||||||
self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
|
self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
|
||||||
self.medium, self.year)
|
self.medium, self.year)
|
||||||
self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
|
self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
|
||||||
|
@ -553,7 +554,8 @@ class TestTrackTable(tests.util.TestCase):
|
||||||
|
|
||||||
track.restart()
|
track.restart()
|
||||||
self.assertTrue(track.active)
|
self.assertTrue(track.active)
|
||||||
self.assertGreater(datetime.datetime.utcnow(), track.restarted)
|
self.assertGreater(datetime.datetime.now(datetime.UTC),
|
||||||
|
track.restarted)
|
||||||
self.assertEqual(self.tracks.current_track, track)
|
self.assertEqual(self.tracks.current_track, track)
|
||||||
|
|
||||||
self.playlists.previous.remove_track.assert_not_called()
|
self.playlists.previous.remove_track.assert_not_called()
|
||||||
|
|
|
@ -49,7 +49,7 @@ class TestListen(tests.util.TestCase):
|
||||||
self.assertIsNone(self.listen.listened_at)
|
self.assertIsNone(self.listen.listened_at)
|
||||||
self.assertIsNone(self.listen.listenid)
|
self.assertIsNone(self.listen.listenid)
|
||||||
|
|
||||||
utc_now = datetime.datetime.utcnow()
|
utc_now = datetime.datetime.now(datetime.UTC)
|
||||||
local_now = utc_now.replace(tzinfo=dateutil.tz.tzutc()).astimezone()
|
local_now = utc_now.replace(tzinfo=dateutil.tz.tzutc()).astimezone()
|
||||||
listen = emmental.listenbrainz.listen.Listen(self.track,
|
listen = emmental.listenbrainz.listen.Listen(self.track,
|
||||||
listenid=1234,
|
listenid=1234,
|
||||||
|
|
|
@ -222,8 +222,8 @@ class TestListenBrainz(tests.util.TestCase):
|
||||||
mock_source_remove: unittest.mock.Mock,
|
mock_source_remove: unittest.mock.Mock,
|
||||||
mock_stdout: io.StringIO):
|
mock_stdout: io.StringIO):
|
||||||
"""Test submitting recently listened tracks."""
|
"""Test submitting recently listened tracks."""
|
||||||
ts1 = datetime.datetime.utcnow()
|
ts1 = datetime.datetime.now(datetime.UTC)
|
||||||
ts2 = datetime.datetime.utcnow()
|
ts2 = datetime.datetime.now(datetime.UTC)
|
||||||
idle_work = self.listenbrainz._ListenBrainz__idle_work
|
idle_work = self.listenbrainz._ListenBrainz__idle_work
|
||||||
listens = [emmental.listenbrainz.listen.Listen(self.track, listenid=1,
|
listens = [emmental.listenbrainz.listen.Listen(self.track, listenid=1,
|
||||||
listened_at=ts1),
|
listened_at=ts1),
|
||||||
|
|
|
@ -22,9 +22,9 @@ class TestEmmental(unittest.TestCase):
|
||||||
"""Check that version constants have been set properly."""
|
"""Check that version constants have been set properly."""
|
||||||
self.assertEqual(emmental.MAJOR_VERSION, 3)
|
self.assertEqual(emmental.MAJOR_VERSION, 3)
|
||||||
self.assertEqual(emmental.MINOR_VERSION, 2)
|
self.assertEqual(emmental.MINOR_VERSION, 2)
|
||||||
self.assertEqual(emmental.MICRO_VERSION, 0)
|
self.assertEqual(emmental.MICRO_VERSION, 1)
|
||||||
self.assertEqual(emmental.VERSION_NUMBER, "3.2.0")
|
self.assertEqual(emmental.VERSION_NUMBER, "3.2.1")
|
||||||
self.assertEqual(emmental.VERSION_STRING, "Emmental 3.2.0-debug")
|
self.assertEqual(emmental.VERSION_STRING, "Emmental 3.2.1-debug")
|
||||||
|
|
||||||
def test_application(self):
|
def test_application(self):
|
||||||
"""Check that the application instance is initialized properly."""
|
"""Check that the application instance is initialized properly."""
|
||||||
|
@ -66,7 +66,7 @@ class TestEmmental(unittest.TestCase):
|
||||||
mock_startup.assert_called()
|
mock_startup.assert_called()
|
||||||
mock_load.assert_called()
|
mock_load.assert_called()
|
||||||
mock_add_window.assert_called_with(self.application.win)
|
mock_add_window.assert_called_with(self.application.win)
|
||||||
mock_set_useragent.assert_called_with("emmental-debug", "3.2.0")
|
mock_set_useragent.assert_called_with("emmental-debug", "3.2.1")
|
||||||
|
|
||||||
@unittest.mock.patch("sys.stdout")
|
@unittest.mock.patch("sys.stdout")
|
||||||
@unittest.mock.patch("gi.repository.Adw.Application.add_window")
|
@unittest.mock.patch("gi.repository.Adw.Application.add_window")
|
||||||
|
|
Loading…
Reference in New Issue
Block a user