Compare commits

...

8 Commits

Author SHA1 Message Date
d3fdf82a93 Emmental 3.2.1
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-06-13 11:35:55 -04:00
28ee637b0a audio: Change the Player to use the new StopWatch object
Instead of using GStreamer's clock. This lets us call "reset()" when a
new track is loaded so we can reset the timer when a track is loaded
during gapless playback.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-06-11 11:28:06 -04:00
270be37848 audio: Create a StopWatch object
The StopWatch will be used to calculate how long a track is in the
'playing' state so we can accurately determin if it has been played or
not.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-06-11 10:22:21 -04:00
a0f240a2ad emmental: Remove remaining datetime.datetime.utcnow() calls
This function has been deprecated, and is scheduled for removal. Let's
get ahead of things by clearing up the warnings now.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-06-05 13:48:00 -04:00
6dfa841cbd db: Add an adapter and converter for date objects
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-06-05 13:30:24 -04:00
745301997e db: Add an adapter and converter for datetime objects
I make sure we handle naive-style datetime objects in addition to
timezone-aware datetimes. I try to keep consistent with how encoding
worked in Python 3.11 so I don't need to do a database upgrade.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-06-05 13:26:11 -04:00
37c74ed5fb db: Move sqlite3 adapters to a new file
I'm going to add more to fix a bunch of warnings that showed up after
upgrading to Python 3.12.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-06-04 21:04:23 -04:00
c3818a2b18 Emmental 3.2 AUR commit
Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2024-03-20 13:55:33 -04:00
14 changed files with 306 additions and 72 deletions

2
aur

@ -1 +1 @@
Subproject commit 0965b4bb9626c7ad1463628646cf734b15e16c5d Subproject commit 543220f35990f5b5a69eac5a6c06e69eae6ffb82

View File

@ -22,7 +22,7 @@ from gi.repository import Adw
MAJOR_VERSION = 3 MAJOR_VERSION = 3
MINOR_VERSION = 2 MINOR_VERSION = 2
MICRO_VERSION = 0 MICRO_VERSION = 1
VERSION_NUMBER = f"{MAJOR_VERSION}.{MINOR_VERSION}.{MICRO_VERSION}" VERSION_NUMBER = f"{MAJOR_VERSION}.{MINOR_VERSION}.{MICRO_VERSION}"
VERSION_STRING = f"Emmental {VERSION_NUMBER}{gsetup.DEBUG_STR}" VERSION_STRING = f"Emmental {VERSION_NUMBER}{gsetup.DEBUG_STR}"

View File

@ -5,6 +5,7 @@ from gi.repository import GObject
from gi.repository import GLib from gi.repository import GLib
from gi.repository import Gst from gi.repository import Gst
from . import filter from . import filter
from . import stopwatch
from .. import path from .. import path
from .. import tmpdir from .. import tmpdir
@ -32,8 +33,7 @@ class Player(GObject.GObject):
status = GObject.Property(type=str, default="Stopped") status = GObject.Property(type=str, default="Stopped")
have_track = GObject.Property(type=bool, default=False) have_track = GObject.Property(type=bool, default=False)
almost_done = GObject.Property(type=bool, default=False) almost_done = GObject.Property(type=bool, default=False)
playtime = GObject.Property(type=float) stopwatch = GObject.Property(type=stopwatch.StopWatch)
savedtime = GObject.Property(type=float)
bg_enabled = GObject.Property(type=bool, default=False) bg_enabled = GObject.Property(type=bool, default=False)
bg_volume = GObject.Property(type=float, default=0.5) bg_volume = GObject.Property(type=float, default=0.5)
@ -41,7 +41,7 @@ class Player(GObject.GObject):
def __init__(self): def __init__(self):
"""Initialize the audio Player.""" """Initialize the audio Player."""
super().__init__() super().__init__(stopwatch=stopwatch.StopWatch())
self._filter = filter.Filter() self._filter = filter.Filter()
self._timeout = None self._timeout = None
@ -70,12 +70,6 @@ class Player(GObject.GObject):
if not self.almost_done: if not self.almost_done:
self.emit("about-to-finish") self.emit("about-to-finish")
def __get_current_playtime(self) -> float:
if not self._playbin.clock:
return 0.0
time = self._playbin.clock.get_time() - self._playbin.base_time
return time / Gst.SECOND
def __msg_async_done(self, bus: Gst.Bus, message: Gst.Message) -> None: def __msg_async_done(self, bus: Gst.Bus, message: Gst.Message) -> None:
self.__update_position() self.__update_position()
@ -95,15 +89,18 @@ class Player(GObject.GObject):
print("audio: state changed to 'playing'") print("audio: state changed to 'playing'")
self.status = "Playing" self.status = "Playing"
self.playing = True self.playing = True
self.stopwatch.start()
case (_, Gst.State.PAUSED, Gst.State.VOID_PENDING): case (_, Gst.State.PAUSED, Gst.State.VOID_PENDING):
print("audio: state changed to 'paused'") print("audio: state changed to 'paused'")
self.status = "Paused" self.status = "Paused"
self.playing = False self.playing = False
self.stopwatch.stop()
case (_, Gst.State.READY, Gst.State.VOID_PENDING) | \ case (_, Gst.State.READY, Gst.State.VOID_PENDING) | \
(_, Gst.State.NULL, Gst.State.VOID_PENDING): (_, Gst.State.NULL, Gst.State.VOID_PENDING):
print("audio: state changed to 'stopped'") print("audio: state changed to 'stopped'")
self.status = "Stopped" self.status = "Stopped"
self.playing = False self.playing = False
self.stopwatch.stop()
self.__update_timeout() self.__update_timeout()
@ -141,8 +138,7 @@ class Player(GObject.GObject):
artwork: pathlib.Path | None = None) -> None: artwork: pathlib.Path | None = None) -> None:
for tag in ["artist", "album-artist", "album", "title"]: for tag in ["artist", "album-artist", "album", "title"]:
self.set_property(tag, "") self.set_property(tag, "")
for tag in ["album-disc-number", "track-number", for tag in ["album-disc-number", "track-number", "position"]:
"position", "playtime", "savedtime"]:
self.set_property(tag, 0) self.set_property(tag, 0)
self.almost_done = False self.almost_done = False
@ -153,7 +149,6 @@ class Player(GObject.GObject):
def __update_position(self) -> bool: def __update_position(self) -> bool:
(res, pos) = self._playbin.query_position(Gst.Format.TIME) (res, pos) = self._playbin.query_position(Gst.Format.TIME)
self.position = pos / Gst.USECOND if res else 0 self.position = pos / Gst.USECOND if res else 0
self.playtime = self.__get_current_playtime() + self.savedtime
self.__check_last_second() self.__check_last_second()
return GLib.SOURCE_CONTINUE return GLib.SOURCE_CONTINUE
@ -189,7 +184,6 @@ class Player(GObject.GObject):
def seek(self, newpos: float, *args) -> None: def seek(self, newpos: float, *args) -> None:
"""Seek to a different point in the stream.""" """Seek to a different point in the stream."""
self.savedtime += self.__get_current_playtime()
self._playbin.seek_simple(Gst.Format.TIME, SEEK_FLAGS, self._playbin.seek_simple(Gst.Format.TIME, SEEK_FLAGS,
newpos * Gst.USECOND) newpos * Gst.USECOND)
@ -210,6 +204,11 @@ class Player(GObject.GObject):
"""Stop playback.""" """Stop playback."""
self.set_state_sync(Gst.State.READY) self.set_state_sync(Gst.State.READY)
@GObject.Property(type=float)
def playtime(self) -> float:
"""Get the total playtime of the current track."""
return self.stopwatch.elapsed_time()
@GObject.Signal @GObject.Signal
def about_to_finish(self) -> None: def about_to_finish(self) -> None:
"""Signal that playback is almost done.""" """Signal that playback is almost done."""
@ -226,6 +225,7 @@ class Player(GObject.GObject):
cover = self.file.parent / "cover.jpg" cover = self.file.parent / "cover.jpg"
self.__reset_properties(duration=(dur / Gst.USECOND if res else 0), self.__reset_properties(duration=(dur / Gst.USECOND if res else 0),
artwork=(cover if cover.is_file() else None)) artwork=(cover if cover.is_file() else None))
self.stopwatch.reset(playing=self.playing)
self.have_track = True self.have_track = True
@GObject.Signal @GObject.Signal

View File

@ -0,0 +1,42 @@
# Copyright 2024 (c) Anna Schumaker.
"""A custom StopWatch object for tracking play time."""
import datetime
from gi.repository import GObject
class StopWatch(GObject.GObject):
"""A StopWatch object."""
def __init__(self) -> None:
"""Initialize the StopWatch."""
super().__init__()
self._saved = None
self._started = None
def elapsed_time(self) -> float:
"""Get the elapsed time (in seconds)."""
total = datetime.timedelta()
if self._saved is not None:
total += self._saved
if self._started is not None:
total += datetime.datetime.now() - self._started
return total.total_seconds()
def reset(self, *, playing: bool) -> None:
"""Reset the StopWatch."""
self._saved = None
self._started = datetime.datetime.now() if playing else None
def start(self) -> None:
"""Start the StopWatch."""
self._started = datetime.datetime.now()
def stop(self) -> None:
"""Stop the StopWatch."""
if self._started is not None:
delta = datetime.datetime.now() - self._started
if self._saved is None:
self._saved = delta
else:
self._saved += delta
self._started = None

50
emmental/db/adapter.py Normal file
View File

@ -0,0 +1,50 @@
# Copyright 2024 (c) Anna Schumaker.
"""Custom sqlite3 adapters."""
import datetime
import pathlib
import sqlite3
def adapt_date(date: datetime.date) -> str:
"""Adapt a date object to IOS 8601 date."""
return date.isoformat()
def adapt_datetime(dt: datetime.datetime) -> str:
"""Adapt a datetime object to ISO 8601 timestamp."""
if dt.tzinfo is not None:
dt = datetime.datetime.combine(dt.date(), dt.time())
return dt.isoformat(" ")
def adapt_path(path: pathlib.Path) -> str:
"""Adapt a pathlib.Path into a sqlite3 string."""
return str(path)
def convert_date(date: bytes) -> datetime.date:
"""Convert ISO 8601 date to a datetime.date."""
return datetime.date.fromisoformat(date.decode())
def convert_datetime(dt: bytes) -> datetime.datetime:
"""Convert ISO 8601 timestamp to a datetime.datetime."""
if (res := datetime.datetime.fromisoformat(dt.decode())).tzinfo is None:
res = datetime.datetime.combine(res.date(), res.time(), datetime.UTC)
return res
def convert_path(path: bytes) -> pathlib.Path:
"""Convert a path string into a pathlib.Path object."""
return pathlib.Path(path.decode())
def register() -> None:
"""Register our adapters and converters."""
sqlite3.register_adapter(datetime.date, adapt_date)
sqlite3.register_adapter(datetime.datetime, adapt_datetime)
sqlite3.register_adapter(pathlib.PosixPath, adapt_path)
sqlite3.register_converter("date", convert_date)
sqlite3.register_converter("timestamp", convert_datetime)
sqlite3.register_converter("path", convert_path)

View File

@ -4,25 +4,14 @@ import pathlib
import sqlite3 import sqlite3
import sys import sys
from gi.repository import GObject from gi.repository import GObject
from . import adapter
from .. import gsetup from .. import gsetup
DATA_FILE = gsetup.DATA_DIR / f"emmental{gsetup.DEBUG_STR}.sqlite3" DATA_FILE = gsetup.DATA_DIR / f"emmental{gsetup.DEBUG_STR}.sqlite3"
DATABASE = ":memory:" if "unittest" in sys.modules else DATA_FILE DATABASE = ":memory:" if "unittest" in sys.modules else DATA_FILE
adapter.register()
def adapt_path(path: pathlib.Path) -> str:
"""Adapt a pathlib.Path into a sqlite3 string."""
return str(path)
def convert_path(path: bytes) -> pathlib.Path:
"""Convert a path string into a pathlib.Path object."""
return pathlib.Path(path.decode())
sqlite3.register_adapter(pathlib.PosixPath, adapt_path)
sqlite3.register_converter("path", convert_path)
class Connection(GObject.GObject): class Connection(GObject.GObject):

View File

@ -242,7 +242,7 @@ class Table(table.Table):
def restart_track(self, track: Track) -> None: def restart_track(self, track: Track) -> None:
"""Mark that a Track has been restarted.""" """Mark that a Track has been restarted."""
track.active = True track.active = True
track.restarted = datetime.datetime.utcnow() track.restarted = datetime.datetime.now(datetime.UTC)
self.current_track = track self.current_track = track
def start_track(self, track: Track) -> None: def start_track(self, track: Track) -> None:
@ -251,7 +251,7 @@ class Table(table.Table):
cur = self.sql("""UPDATE tracks SET active=TRUE, laststarted=? cur = self.sql("""UPDATE tracks SET active=TRUE, laststarted=?
WHERE trackid=? RETURNING laststarted""", WHERE trackid=? RETURNING laststarted""",
datetime.datetime.utcnow(), track.trackid) datetime.datetime.now(datetime.UTC), track.trackid)
track.active = True track.active = True
track.laststarted = cur.fetchone()["laststarted"] track.laststarted = cur.fetchone()["laststarted"]
self.current_track = track self.current_track = track

View File

@ -1,5 +1,6 @@
# Copyright 2022 (c) Anna Schumaker. # Copyright 2022 (c) Anna Schumaker.
"""Tests our GObject audio player wrapping a GStreamer Playbin element.""" """Tests our GObject audio player wrapping a GStreamer Playbin element."""
import datetime
import io import io
import pathlib import pathlib
import unittest import unittest
@ -80,10 +81,10 @@ class TestAudio(unittest.TestCase):
self.player.file = tests.util.TRACK_OGG self.player.file = tests.util.TRACK_OGG
self.player.duration = 10 self.player.duration = 10
self.player.position = 8 self.player.position = 8
self.player.playtime = 6
self.player.savedtime = 4
self.player.almost_done = True self.player.almost_done = True
self.player.artwork = pathlib.Path("/a/b/c.jpg") self.player.artwork = pathlib.Path("/a/b/c.jpg")
self.player.stopwatch.reset = unittest.mock.Mock()
self.player.stopwatch._saved = datetime.timedelta(seconds=6)
eos = Gst.Message.new_eos(self.player._playbin) eos = Gst.Message.new_eos(self.player._playbin)
self.player._playbin.get_bus().post(eos) self.player._playbin.get_bus().post(eos)
@ -94,9 +95,11 @@ class TestAudio(unittest.TestCase):
for prop in ["artist", "album-artist", "album", "title"]: for prop in ["artist", "album-artist", "album", "title"]:
self.assertEqual(self.player.get_property(prop), "") self.assertEqual(self.player.get_property(prop), "")
for prop in ["album-disc-number", "track-number", for prop in ["album-disc-number", "track-number",
"position", "duration", "playtime", "savedtime"]: "position", "duration"]:
self.assertEqual(self.player.get_property(prop), 0) self.assertEqual(self.player.get_property(prop), 0)
self.assertIsNone(self.player.artwork) self.assertIsNone(self.player.artwork)
self.player.stopwatch.reset.assert_not_called()
self.assertEqual(self.player.playtime, 6)
self.assertEqual(self.player.get_state(), Gst.State.READY) self.assertEqual(self.player.get_state(), Gst.State.READY)
self.assertEqual(self.player.status, "Stopped") self.assertEqual(self.player.status, "Stopped")
@ -150,22 +153,28 @@ class TestAudio(unittest.TestCase):
"audio: file loaded\n" "audio: file loaded\n"
"audio: state changed to 'paused'\n") "audio: state changed to 'paused'\n")
self.player.playtime = 6 self.player.stopwatch._saved = datetime.timedelta(seconds=6)
self.player.savedtime = 4
self.player.emit("file-loaded", tests.util.TRACK_OGG) self.player.emit("file-loaded", tests.util.TRACK_OGG)
for prop in ["artist", "album-artist", "album", "title"]: for prop in ["artist", "album-artist", "album", "title"]:
self.assertEqual(self.player.get_property(prop), "") self.assertEqual(self.player.get_property(prop), "")
for prop in ["album-disc-number", "track-number", for prop in ["album-disc-number", "track-number", "playtime"]:
"playtime", "savedtime"]:
self.assertEqual(self.player.get_property(prop), 0) self.assertEqual(self.player.get_property(prop), 0)
self.assertIsNone(self.player.stopwatch._started)
self.player.playing = True
self.player.emit("file-loaded", tests.util.TRACK_OGG)
self.assertIsNotNone(self.player.stopwatch._started)
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO) @unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
def test_play(self, mock_stdout: io.StringIO): def test_play(self, mock_stdout: io.StringIO):
"""Test that the play() function works as expected.""" """Test that the play() function works as expected."""
self.player.stopwatch.start = unittest.mock.Mock()
self.player.play() self.player.play()
self.main_loop() self.main_loop()
self.assertFalse(self.player.playing) self.assertFalse(self.player.playing)
self.assertEqual(self.player.status, "Stopped") self.assertEqual(self.player.status, "Stopped")
self.player.stopwatch.start.assert_not_called()
self.player.file = tests.util.TRACK_OGG self.player.file = tests.util.TRACK_OGG
self.player.play() self.player.play()
@ -175,13 +184,17 @@ class TestAudio(unittest.TestCase):
self.assertEqual(self.player.get_state(), Gst.State.PLAYING) self.assertEqual(self.player.get_state(), Gst.State.PLAYING)
self.assertRegex(mock_stdout.getvalue(), self.assertRegex(mock_stdout.getvalue(),
"audio: state changed to 'playing'$") "audio: state changed to 'playing'$")
self.player.stopwatch.start.assert_called()
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO) @unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
def test_pause(self, mock_stdout: io.StringIO): def test_pause(self, mock_stdout: io.StringIO):
"""Test that the pause() function works as expected.""" """Test that the pause() function works as expected."""
self.player.stopwatch.stop = unittest.mock.Mock()
self.player.pause() self.player.pause()
self.main_loop() self.main_loop()
self.assertEqual(self.player.status, "Stopped") self.assertEqual(self.player.status, "Stopped")
self.player.stopwatch.stop.assert_not_called()
self.player.playing = True self.player.playing = True
self.player.file = tests.util.TRACK_OGG self.player.file = tests.util.TRACK_OGG
@ -192,6 +205,7 @@ class TestAudio(unittest.TestCase):
self.assertEqual(self.player.get_state(), Gst.State.PAUSED) self.assertEqual(self.player.get_state(), Gst.State.PAUSED)
self.assertRegex(mock_stdout.getvalue(), self.assertRegex(mock_stdout.getvalue(),
"audio: state changed to 'paused'$") "audio: state changed to 'paused'$")
self.player.stopwatch.stop.assert_called()
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO) @unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
def test_play_pause(self, mock_stdout: io.StringIO): def test_play_pause(self, mock_stdout: io.StringIO):
@ -222,6 +236,7 @@ class TestAudio(unittest.TestCase):
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO) @unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
def test_stop(self, mock_stdout: io.StringIO): def test_stop(self, mock_stdout: io.StringIO):
"""Test that the stop() function works as expected.""" """Test that the stop() function works as expected."""
self.player.stopwatch.stop = unittest.mock.Mock()
self.player.file = tests.util.TRACK_OGG self.player.file = tests.util.TRACK_OGG
self.player.play() self.player.play()
self.main_loop() self.main_loop()
@ -234,6 +249,7 @@ class TestAudio(unittest.TestCase):
self.assertRegex(mock_stdout.getvalue(), self.assertRegex(mock_stdout.getvalue(),
"audio: state changed to 'playing'\n" "audio: state changed to 'playing'\n"
"audio: state changed to 'stopped'\n$") "audio: state changed to 'stopped'\n$")
self.player.stopwatch.stop.assert_called()
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO) @unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
def test_tags(self, mock_stdout: io.StringIO): def test_tags(self, mock_stdout: io.StringIO):
@ -282,30 +298,14 @@ class TestAudio(unittest.TestCase):
@unittest.mock.patch("sys.stdout", new_callable=io.StringIO) @unittest.mock.patch("sys.stdout", new_callable=io.StringIO)
def test_playtime(self, mock_stdout: io.StringIO): def test_playtime(self, mock_stdout: io.StringIO):
"""Test the play time property.""" """Test the play time property."""
self.assertIsInstance(self.player.stopwatch,
emmental.audio.stopwatch.StopWatch)
self.assertEqual(self.player.playtime, 0.0) self.assertEqual(self.player.playtime, 0.0)
self.assertEqual(self.player.savedtime, 0.0)
self.assertEqual(self.player._Player__get_current_playtime(), 0.0)
self.player.file = tests.util.TRACK_OGG with unittest.mock.patch.object(self.player.stopwatch,
self.player.play() "elapsed_time") as mock_elapsed:
self.main_loop() mock_elapsed.return_value = 2.0
self.assertIsNotNone(self.player._playbin.clock) self.assertEqual(self.player.playtime, 2.0)
base_time = unittest.mock.PropertyMock(return_value=Gst.SECOND)
type(self.player._playbin).base_time = base_time
get_time = unittest.mock.Mock(return_value=3 * Gst.SECOND)
self.player._playbin.clock.get_time = get_time
self.assertEqual(self.player._Player__get_current_playtime(), 2.0)
self.player._Player__update_position()
self.assertEqual(self.player.playtime, 2)
with unittest.mock.patch.object(self.player._playbin, "seek_simple"):
self.player.seek(5)
self.assertEqual(self.player.savedtime, 2)
self.player._Player__update_position()
self.assertEqual(self.player.playtime, 4)
def test_volume(self): def test_volume(self):
"""Test that the volume property works as expected.""" """Test that the volume property works as expected."""

View File

@ -0,0 +1,91 @@
# Copyright 2024 (c) Anna Schumaker.
"""Tests our StopWatch object."""
import datetime
import emmental.audio.stopwatch
import unittest
from gi.repository import GObject
@unittest.mock.patch.object(emmental.audio.stopwatch, "datetime")
class TestStopwatch(unittest.TestCase):
"""Our stopwatch test case."""
def setUp(self):
"""Set up common variables."""
self.now = datetime.datetime.now()
self.newdelta = datetime.timedelta
self.stopwatch = emmental.audio.stopwatch.StopWatch()
def test_init(self, mock_datetime: unittest.mock.Mock):
"""Test that the StopWatch was created properly."""
self.assertIsInstance(self.stopwatch, GObject.GObject)
def test_elapsed_time(self, mock_datetime: unittest.mock.Mock):
"""Test the elapsed_time() function."""
mock_datetime.timedelta = self.newdelta
self.assertEqual(self.stopwatch.elapsed_time(), 0.0)
mock_datetime.datetime.now.return_value = self.now
self.stopwatch.start()
soon = self.now + datetime.timedelta(seconds=12.345)
mock_datetime.datetime.now.return_value = soon
self.assertEqual(self.stopwatch.elapsed_time(), 12.345)
self.stopwatch._saved = datetime.timedelta(seconds=2)
self.assertEqual(self.stopwatch.elapsed_time(), 14.345)
self.stopwatch.stop()
self.assertEqual(self.stopwatch.elapsed_time(), 14.345)
self.stopwatch.reset(playing=False)
self.assertEqual(self.stopwatch.elapsed_time(), 0.0)
def test_reset(self, mock_datetime: unittest.mock.Mock):
"""Test resetting the StopWatch."""
mock_datetime.datetime.now.return_value = self.now
self.stopwatch.start()
soon = self.now + datetime.timedelta(seconds=12.345)
mock_datetime.datetime.now.return_value = soon
self.stopwatch.stop()
self.stopwatch.reset(playing=False)
self.assertIsNone(self.stopwatch._saved)
self.assertIsNone(self.stopwatch._started)
self.stopwatch.reset(playing=True)
self.assertIsNone(self.stopwatch._saved)
self.assertEqual(self.stopwatch._started, soon)
def test_start(self, mock_datetime: unittest.mock.Mock):
"""Test starting the StopWatch."""
self.assertIsNone(self.stopwatch._started)
mock_datetime.datetime.now.return_value = self.now
self.stopwatch.start()
self.assertEqual(self.stopwatch._started, self.now)
def test_stop(self, mock_datetime: unittest.mock.Mock):
"""Test stopping the StopWatch."""
self.assertIsNone(self.stopwatch._saved)
self.stopwatch.stop()
self.assertIsNone(self.stopwatch._saved)
mock_datetime.datetime.now.return_value = self.now
self.stopwatch.start()
delta1 = datetime.timedelta(seconds=12.345)
mock_datetime.datetime.now.return_value = self.now + delta1
self.stopwatch.stop()
self.assertEqual(self.stopwatch._saved, delta1)
self.assertIsNone(self.stopwatch._started)
now = self.now + delta1 + datetime.timedelta(seconds=2)
mock_datetime.datetime.now.return_value = now
self.stopwatch.start()
delta2 = datetime.timedelta(seconds=3)
mock_datetime.datetime.now.return_value = now + delta2
self.stopwatch.stop()
self.assertEqual(self.stopwatch._saved, delta1 + delta2)
self.stopwatch.stop()
self.assertEqual(self.stopwatch._saved, delta1 + delta2)

60
tests/db/test_adapter.py Normal file
View File

@ -0,0 +1,60 @@
# Copyright 2024 (c) Anna Schumaker.
"""Test our custom sqlite3 adapters."""
import datetime
import emmental.db.adapter
import pathlib
import unittest
class TestAdapters(unittest.TestCase):
"""Test case for our database adapters."""
def test_date(self):
"""Test adapting and converting a date object."""
date = datetime.date(2024, 6, 5)
self.assertEqual(emmental.db.adapter.adapt_date(date),
"2024-06-05")
self.assertEqual(emmental.db.adapter.convert_date(b"2024-06-05"),
date)
def test_datetime(self):
"""Test adapting and converting a datetime object."""
dt_utc = datetime.datetime.now(datetime.UTC)
dt_naive = datetime.datetime.combine(dt_utc.date(), dt_utc.time())
iso_naive = dt_naive.isoformat(" ").encode()
self.assertEqual(emmental.db.adapter.adapt_datetime(dt_utc),
iso_naive.decode())
self.assertEqual(emmental.db.adapter.adapt_datetime(dt_naive),
iso_naive.decode())
self.assertEqual(emmental.db.adapter.convert_datetime(iso_naive),
dt_utc)
def test_path(self):
"""Test adapting and converting a pathlib.Path object."""
path = pathlib.Path("/my/test/path")
self.assertEqual(emmental.db.adapter.adapt_path(path),
"/my/test/path")
self.assertEqual(emmental.db.adapter.convert_path(b"/my/test/path"),
path)
@unittest.mock.patch("sqlite3.register_converter")
@unittest.mock.patch("sqlite3.register_adapter")
def test_register(self, register_adapter: unittest.mock.Mock,
register_converter: unittest.mock.Mock):
"""Test registering adapters and converters."""
emmental.db.adapter.register()
adapters = [(datetime.date, emmental.db.adapter.adapt_date),
(datetime.datetime, emmental.db.adapter.adapt_datetime),
(pathlib.PosixPath, emmental.db.adapter.adapt_path)]
register_adapter.assert_has_calls([unittest.mock.call(*a)
for a in adapters])
converters = [("date", emmental.db.adapter.convert_date),
("timestamp", emmental.db.adapter.convert_datetime),
("path", emmental.db.adapter.convert_path)]
register_converter.assert_has_calls([unittest.mock.call(*c)
for c in converters])

View File

@ -214,6 +214,7 @@ class TestTrackTable(tests.util.TestCase):
def test_create(self): def test_create(self):
"""Test creating a new Track.""" """Test creating a new Track."""
now = datetime.datetime.now(datetime.UTC)
track = self.tracks.create(self.library, pathlib.Path("/a/b/c.ogg"), track = self.tracks.create(self.library, pathlib.Path("/a/b/c.ogg"),
self.medium, self.year) self.medium, self.year)
self.assertIsInstance(track, emmental.db.tracks.Track) self.assertIsInstance(track, emmental.db.tracks.Track)
@ -221,7 +222,7 @@ class TestTrackTable(tests.util.TestCase):
self.assertEqual(track.mediumid, self.medium.mediumid) self.assertEqual(track.mediumid, self.medium.mediumid)
self.assertEqual(track.year, 1988) self.assertEqual(track.year, 1988)
self.assertEqual(track.path, pathlib.Path("/a/b/c.ogg")) self.assertEqual(track.path, pathlib.Path("/a/b/c.ogg"))
self.assertEqual(track.added, datetime.datetime.utcnow().date()) self.assertEqual(track.added, now.date())
track2 = self.tracks.create(self.library, pathlib.Path("/a/b/d.ogg"), track2 = self.tracks.create(self.library, pathlib.Path("/a/b/d.ogg"),
self.medium, self.year, title="Test Track", self.medium, self.year, title="Test Track",
@ -247,7 +248,7 @@ class TestTrackTable(tests.util.TestCase):
def test_create_restore(self): def test_create_restore(self):
"""Test restoring saved track data.""" """Test restoring saved track data."""
now = datetime.datetime.utcnow() now = datetime.datetime.now(datetime.UTC)
today = now.date() today = now.date()
yesterday = today - datetime.timedelta(days=1) yesterday = today - datetime.timedelta(days=1)
self.sql("""INSERT INTO saved_track_data self.sql("""INSERT INTO saved_track_data
@ -308,7 +309,7 @@ class TestTrackTable(tests.util.TestCase):
def test_delete_save(self): def test_delete_save(self):
"""Test saving track data when a track is deleted.""" """Test saving track data when a track is deleted."""
now = datetime.datetime.now() now = datetime.datetime.now(datetime.UTC)
track1 = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), track1 = self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year, mbid="ab-cd-ef") self.medium, self.year, mbid="ab-cd-ef")
track2 = self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"), track2 = self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
@ -328,7 +329,7 @@ class TestTrackTable(tests.util.TestCase):
self.assertEqual(rows[0]["laststarted"], now) self.assertEqual(rows[0]["laststarted"], now)
self.assertEqual(rows[0]["lastplayed"], now) self.assertEqual(rows[0]["lastplayed"], now)
self.assertEqual(rows[0]["playcount"], 42) self.assertEqual(rows[0]["playcount"], 42)
self.assertEqual(rows[0]["added"], datetime.datetime.utcnow().date()) self.assertEqual(rows[0]["added"], now.date())
def test_filter(self): def test_filter(self):
"""Test filtering the Track table.""" """Test filtering the Track table."""
@ -354,7 +355,7 @@ class TestTrackTable(tests.util.TestCase):
def test_load(self): def test_load(self):
"""Test loading tracks from the database.""" """Test loading tracks from the database."""
now = datetime.datetime.now() now = datetime.datetime.now(datetime.UTC)
self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"), self.tracks.create(self.library, pathlib.Path("/a/b/1.ogg"),
self.medium, self.year) self.medium, self.year)
self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"), self.tracks.create(self.library, pathlib.Path("/a/b/2.ogg"),
@ -553,7 +554,8 @@ class TestTrackTable(tests.util.TestCase):
track.restart() track.restart()
self.assertTrue(track.active) self.assertTrue(track.active)
self.assertGreater(datetime.datetime.utcnow(), track.restarted) self.assertGreater(datetime.datetime.now(datetime.UTC),
track.restarted)
self.assertEqual(self.tracks.current_track, track) self.assertEqual(self.tracks.current_track, track)
self.playlists.previous.remove_track.assert_not_called() self.playlists.previous.remove_track.assert_not_called()

View File

@ -49,7 +49,7 @@ class TestListen(tests.util.TestCase):
self.assertIsNone(self.listen.listened_at) self.assertIsNone(self.listen.listened_at)
self.assertIsNone(self.listen.listenid) self.assertIsNone(self.listen.listenid)
utc_now = datetime.datetime.utcnow() utc_now = datetime.datetime.now(datetime.UTC)
local_now = utc_now.replace(tzinfo=dateutil.tz.tzutc()).astimezone() local_now = utc_now.replace(tzinfo=dateutil.tz.tzutc()).astimezone()
listen = emmental.listenbrainz.listen.Listen(self.track, listen = emmental.listenbrainz.listen.Listen(self.track,
listenid=1234, listenid=1234,

View File

@ -222,8 +222,8 @@ class TestListenBrainz(tests.util.TestCase):
mock_source_remove: unittest.mock.Mock, mock_source_remove: unittest.mock.Mock,
mock_stdout: io.StringIO): mock_stdout: io.StringIO):
"""Test submitting recently listened tracks.""" """Test submitting recently listened tracks."""
ts1 = datetime.datetime.utcnow() ts1 = datetime.datetime.now(datetime.UTC)
ts2 = datetime.datetime.utcnow() ts2 = datetime.datetime.now(datetime.UTC)
idle_work = self.listenbrainz._ListenBrainz__idle_work idle_work = self.listenbrainz._ListenBrainz__idle_work
listens = [emmental.listenbrainz.listen.Listen(self.track, listenid=1, listens = [emmental.listenbrainz.listen.Listen(self.track, listenid=1,
listened_at=ts1), listened_at=ts1),

View File

@ -22,9 +22,9 @@ class TestEmmental(unittest.TestCase):
"""Check that version constants have been set properly.""" """Check that version constants have been set properly."""
self.assertEqual(emmental.MAJOR_VERSION, 3) self.assertEqual(emmental.MAJOR_VERSION, 3)
self.assertEqual(emmental.MINOR_VERSION, 2) self.assertEqual(emmental.MINOR_VERSION, 2)
self.assertEqual(emmental.MICRO_VERSION, 0) self.assertEqual(emmental.MICRO_VERSION, 1)
self.assertEqual(emmental.VERSION_NUMBER, "3.2.0") self.assertEqual(emmental.VERSION_NUMBER, "3.2.1")
self.assertEqual(emmental.VERSION_STRING, "Emmental 3.2.0-debug") self.assertEqual(emmental.VERSION_STRING, "Emmental 3.2.1-debug")
def test_application(self): def test_application(self):
"""Check that the application instance is initialized properly.""" """Check that the application instance is initialized properly."""
@ -66,7 +66,7 @@ class TestEmmental(unittest.TestCase):
mock_startup.assert_called() mock_startup.assert_called()
mock_load.assert_called() mock_load.assert_called()
mock_add_window.assert_called_with(self.application.win) mock_add_window.assert_called_with(self.application.win)
mock_set_useragent.assert_called_with("emmental-debug", "3.2.0") mock_set_useragent.assert_called_with("emmental-debug", "3.2.1")
@unittest.mock.patch("sys.stdout") @unittest.mock.patch("sys.stdout")
@unittest.mock.patch("gi.repository.Adw.Application.add_window") @unittest.mock.patch("gi.repository.Adw.Application.add_window")