403 lines
17 KiB
Python
403 lines
17 KiB
Python
# Copyright 2022 (c) Anna Schumaker
|
|
"""Tests our Mutagen wrapper."""
|
|
import pathlib
|
|
import threading
|
|
import unittest.mock
|
|
import emmental.db.tagger
|
|
import tests.util
|
|
|
|
|
|
class TestTags(tests.util.TestCase):
|
|
"""Test the Tags object."""
|
|
|
|
def setUp(self):
|
|
"""Set up common variables."""
|
|
super().setUp()
|
|
for tbl in self.sql.playlist_tables():
|
|
tbl.queue.enabled = False
|
|
tbl.load()
|
|
|
|
self.library = self.sql.libraries.create("/a/b")
|
|
self.file = pathlib.Path("/a/b/c/track.ogg")
|
|
|
|
def make_tags(self, raw_tags: dict, *, length: int = 0,
|
|
mtime: float = 0.0) -> emmental.db.tagger.Tags:
|
|
"""Set up and return our Tags object."""
|
|
audio_tags = emmental.audio.tagger._Tags(self.file, raw_tags,
|
|
length, mtime)
|
|
return emmental.db.tagger.Tags(self.sql, audio_tags, self.library)
|
|
|
|
def test_init(self):
|
|
"""Test that the Tags object was set up properly."""
|
|
tags = self.make_tags({})
|
|
self.assertEqual(tags.db, self.sql)
|
|
|
|
self.assertIsNone(tags.album)
|
|
self.assertIsNone(tags.decade)
|
|
self.assertIsNone(tags.medium)
|
|
self.assertIsNone(tags.year)
|
|
self.assertIsNone(tags.track)
|
|
|
|
self.assertListEqual(tags.album_artists, [])
|
|
self.assertListEqual(tags.artists, [])
|
|
self.assertListEqual(tags.genres, [])
|
|
|
|
def test_album(self):
|
|
"""Test that the album was tagged properly."""
|
|
raw_tags = {"album": ["Album Name"],
|
|
"musicbrainz_releasegroupid": ["ab-cd-ef"],
|
|
"albumartist": ["Album Artist"],
|
|
"date": ["1988-06"]}
|
|
|
|
with unittest.mock.patch.object(pathlib.Path, "is_file",
|
|
return_value=True):
|
|
tags = self.make_tags(raw_tags)
|
|
self.assertIsInstance(tags.album, emmental.db.albums.Album)
|
|
self.assertEqual(tags.album.name, "Album Name")
|
|
self.assertEqual(tags.album.mbid, "ab-cd-ef")
|
|
self.assertEqual(tags.album.artist, "Album Artist")
|
|
self.assertEqual(tags.album.release, "1988-06")
|
|
self.assertEqual(tags.album.cover, pathlib.Path("/a/b/c/cover.jpg"))
|
|
|
|
self.assertEqual(self.make_tags(raw_tags).album, tags.album)
|
|
self.assertIsNone(tags.album.cover)
|
|
|
|
def test_album_artists(self):
|
|
"""Test that album artists were tagged and updated properly."""
|
|
raw_tags = {"album": ["Album Name"],
|
|
"musicbrainz_releasegroupid": ["ab-cd-ef"],
|
|
"albumartist": ["Artist 1", "Artist 2"],
|
|
"musicbrainz_albumartistid": ["gh-ij-kl", "mn-op-qr"],
|
|
"date": ["1988-06"]}
|
|
|
|
tags = self.make_tags(raw_tags)
|
|
self.assertEqual(len(tags.album_artists), 2)
|
|
for i, artist in enumerate(tags.album_artists):
|
|
with self.subTest(i=i):
|
|
self.assertIsInstance(artist, emmental.db.artists.Artist)
|
|
self.assertEqual(artist.name, f"Artist {i+1}")
|
|
self.assertEqual(artist.mbid,
|
|
"gh-ij-kl" if i == 0 else "mn-op-qr")
|
|
|
|
self.assertListEqual(tags.album.get_artists(), tags.album_artists)
|
|
self.assertListEqual(self.make_tags(raw_tags).album_artists,
|
|
tags.album_artists)
|
|
|
|
raw_tags["albumartist"] = ["Artist 1", "Artist 3"]
|
|
raw_tags["musicbrainz_albumartistid"] = ["gh-ij-kl", "st-uv-wx"]
|
|
updated = self.make_tags(raw_tags)
|
|
self.assertEqual(updated.album, tags.album)
|
|
self.assertListEqual(tags.album.get_artists(), updated.album_artists)
|
|
|
|
def test_artists(self):
|
|
"""Test that artists were tagged properly."""
|
|
raw_tags = {"artists": ["Artist 1", "Artist 2"],
|
|
"albumartist": ["Artist 3"]}
|
|
|
|
tags = self.make_tags(raw_tags)
|
|
self.assertEqual(len(tags.artists), 3)
|
|
for i, artist in enumerate(tags.artists):
|
|
with self.subTest(i=i):
|
|
self.assertIsInstance(artist, emmental.db.artists.Artist)
|
|
self.assertEqual(artist.name, f"Artist {i+1}")
|
|
|
|
self.assertListEqual(self.make_tags(raw_tags).artists, tags.artists)
|
|
|
|
def test_decade(self):
|
|
"""Test that the decade was tagged properly."""
|
|
raw_tags = {"date": ["1988-06-17"]}
|
|
tags = self.make_tags(raw_tags)
|
|
self.assertIsInstance(tags.decade, emmental.db.decades.Decade)
|
|
self.assertEqual(tags.decade.decade, 1980)
|
|
|
|
self.assertEqual(self.make_tags(raw_tags).decade, tags.decade)
|
|
|
|
def test_genres(self):
|
|
"""Test that genres were tagged properly."""
|
|
self.sql.genres.autodelete = False
|
|
raw_tags = {"genre": ["Genre 1", "Genre 2"]}
|
|
|
|
tags = self.make_tags(raw_tags)
|
|
self.assertEqual(len(tags.genres), 2)
|
|
for i, genre in enumerate(tags.genres):
|
|
with self.subTest(i=i):
|
|
self.assertIsInstance(genre, emmental.db.genres.Genre)
|
|
self.assertEqual(genre.name, f"Genre {i+1}")
|
|
|
|
self.assertListEqual(self.make_tags(raw_tags).genres, tags.genres)
|
|
|
|
def test_medium(self):
|
|
"""Test that the medium was tagged properly."""
|
|
raw_tags = {"album": ["Album Name"],
|
|
"musicbrainz_releasegroupid": ["ab-cd-ef"],
|
|
"albumartist": ["Album Artist"],
|
|
"date": ["1988-06"],
|
|
"discnumber": ["2"],
|
|
"discsubtitle": ["Subtitle"],
|
|
"media": ["CD"]}
|
|
|
|
tags = self.make_tags(raw_tags)
|
|
self.assertIsInstance(tags.medium, emmental.db.media.Medium)
|
|
self.assertEqual(tags.medium.number, 2)
|
|
self.assertEqual(tags.medium.name, "Subtitle")
|
|
self.assertEqual(tags.medium.type, "CD")
|
|
|
|
raw_tags["discsubtitle"] = ["New Subtitle"]
|
|
self.assertEqual(self.make_tags(raw_tags).medium, tags.medium)
|
|
self.assertEqual(tags.medium.name, "New Subtitle")
|
|
|
|
def test_track(self):
|
|
"""Test that the Track was tagged properly."""
|
|
raw_tags = {"album": ["Album Name"],
|
|
"artist": ["Track Artist"],
|
|
"date": ["1988-06"],
|
|
"title": ["Test Title"],
|
|
"tracknumber": ["3"],
|
|
"musicbrainz_releasetrackid": ["ab-cd-ef"]}
|
|
|
|
tags = self.make_tags(raw_tags, length=42, mtime=1.234)
|
|
self.assertIsInstance(tags.track, emmental.db.tracks.Track)
|
|
self.assertEqual(tags.track.get_library(), self.library)
|
|
self.assertEqual(tags.track.get_medium(), tags.medium)
|
|
self.assertEqual(tags.track.get_year(), tags.year)
|
|
self.assertEqual(tags.track.path, self.file)
|
|
self.assertEqual(tags.track.title, "Test Title")
|
|
self.assertEqual(tags.track.artist, "Track Artist")
|
|
self.assertEqual(tags.track.mbid, "ab-cd-ef")
|
|
self.assertEqual(tags.track.number, 3)
|
|
self.assertEqual(tags.track.length, 42)
|
|
self.assertEqual(tags.track.mtime, 1.234)
|
|
|
|
self.assertTrue(self.sql.playlists.collection.has_track(tags.track))
|
|
self.assertTrue(self.sql.playlists.new_tracks.has_track(tags.track))
|
|
self.assertTrue(self.sql.playlists.unplayed.has_track(tags.track))
|
|
self.assertTrue(self.library.has_track(tags.track))
|
|
|
|
raw_tags["artist"] = ["New Artist"]
|
|
raw_tags["date"] = ["1985-08"]
|
|
raw_tags["discnumber"] = ["2"]
|
|
raw_tags["title"] = ["New Title"]
|
|
raw_tags["tracknumber"] = ["4"]
|
|
raw_tags["musicbrainz_releasetrackid"] = ["gh-ij-kl"]
|
|
for playlist in self.sql.playlists:
|
|
playlist.tracks.trackids = set()
|
|
|
|
new_tags = self.make_tags(raw_tags, length=53, mtime=5.6789)
|
|
self.assertEqual(new_tags.track, tags.track)
|
|
self.assertEqual(new_tags.track.get_medium(), new_tags.medium)
|
|
self.assertEqual(new_tags.track.get_year(), new_tags.year)
|
|
self.assertEqual(new_tags.track.title, "New Title")
|
|
self.assertEqual(new_tags.track.artist, "New Artist")
|
|
self.assertEqual(new_tags.track.mbid, "gh-ij-kl")
|
|
self.assertEqual(new_tags.track.number, 4)
|
|
self.assertEqual(new_tags.track.length, 53.0)
|
|
self.assertEqual(new_tags.track.mtime, 5.6789)
|
|
|
|
for playlist in self.sql.playlists:
|
|
with self.subTest(playlist=playlist.name):
|
|
self.assertFalse(playlist.has_track(new_tags.track))
|
|
|
|
def test_track_playlist_update(self):
|
|
"""Test updating Track Playlists."""
|
|
album = self.sql.albums.create("Album Name", "Artist 2", "1988-06")
|
|
medium = self.sql.media.create(album, "", number=1)
|
|
decade = self.sql.decades.create(1980)
|
|
year = self.sql.years.create(1988)
|
|
raw_tags = {"album": ["Album Name"],
|
|
"artist": ["Artist 2"],
|
|
"artists": ["Artist 1", "Artist 2"],
|
|
"date": ["1988-06"],
|
|
"discnumber": ["1"],
|
|
"genre": ["Genre 1", "Genre 2"]}
|
|
|
|
tags = self.make_tags(raw_tags)
|
|
self.assertListEqual(tags.track.get_artists(), tags.artists)
|
|
self.assertListEqual(tags.track.get_genres(), tags.genres)
|
|
self.assertTrue(album.has_track(tags.track))
|
|
self.assertTrue(medium.has_track(tags.track))
|
|
self.assertTrue(decade.has_track(tags.track))
|
|
self.assertTrue(year.has_track(tags.track))
|
|
|
|
new_album = self.sql.albums.create("New Album Name", "Artist 2",
|
|
"1992-10")
|
|
new_medium = self.sql.media.create(new_album, "", number=2)
|
|
new_decade = self.sql.decades.create(1990)
|
|
new_year = self.sql.years.create(1992)
|
|
raw_tags["album"] = ["New Album Name"]
|
|
raw_tags["artists"] = ["Artist 2", "Artist 3"]
|
|
raw_tags["date"] = ["1992-10"]
|
|
raw_tags["discnumber"] = ["2"]
|
|
raw_tags["genre"] = ["Genre 2", "Genre 3"]
|
|
|
|
new_tags = self.make_tags(raw_tags)
|
|
self.assertEqual(new_tags.track, tags.track)
|
|
self.assertListEqual(tags.track.get_artists(), new_tags.artists)
|
|
self.assertListEqual(tags.track.get_genres(), new_tags.genres)
|
|
|
|
self.assertFalse(album.has_track(tags.track))
|
|
self.assertFalse(medium.has_track(tags.track))
|
|
self.assertFalse(decade.has_track(tags.track))
|
|
self.assertFalse(year.has_track(tags.track))
|
|
|
|
self.assertTrue(new_album.has_track(tags.track))
|
|
self.assertTrue(new_medium.has_track(tags.track))
|
|
self.assertTrue(new_decade.has_track(tags.track))
|
|
self.assertTrue(new_year.has_track(tags.track))
|
|
|
|
def test_year(self):
|
|
"""Test that the year was tagged properly."""
|
|
raw_tags = {"date": ["1988-06-17"]}
|
|
tags = self.make_tags(raw_tags)
|
|
self.assertIsInstance(tags.year, emmental.db.years.Year)
|
|
self.assertEqual(tags.year.year, 1988)
|
|
|
|
self.assertEqual(self.make_tags(raw_tags).year, tags.year)
|
|
|
|
|
|
@unittest.mock.patch("emmental.audio.tagger.tag_file")
|
|
class TestTaggerThread(tests.util.TestCase):
|
|
"""Test the tagger thread behavior."""
|
|
|
|
def setUp(self):
|
|
"""Set up common variables."""
|
|
super().setUp()
|
|
self.library = self.sql.libraries.create("/a/b")
|
|
self.tagger = emmental.db.tagger.Thread()
|
|
self.tags = dict()
|
|
|
|
def tearDown(self):
|
|
"""Clean up."""
|
|
super().tearDown()
|
|
self.tagger.stop()
|
|
|
|
def make_tags(self, tags: dict) -> emmental.db.tagger.Tags:
|
|
"""Set up and return our Tags object."""
|
|
return emmental.audio.tagger._Tags(pathlib.Path("/a/b/c.ogg"), tags)
|
|
|
|
def test_init(self, mock_file: unittest.mock.Mock):
|
|
"""Test that the tagger thread was initialized properly."""
|
|
self.assertIsInstance(self.tagger, threading.Thread)
|
|
self.assertIsInstance(self.tagger._condition, threading.Condition)
|
|
self.assertTrue(self.tagger.is_alive())
|
|
|
|
def test_stop(self, mock_file: unittest.mock.Mock):
|
|
"""Test the stop function."""
|
|
mock_connection = unittest.mock.Mock()
|
|
mock_connection.close = unittest.mock.Mock()
|
|
|
|
self.tagger._file = "abcde"
|
|
self.tagger._mtime = 12345
|
|
self.tagger._connection = mock_connection
|
|
|
|
with unittest.mock.patch.object(self.tagger._condition, "notify",
|
|
wraps=self.tagger._condition.notify) \
|
|
as mock_notify:
|
|
self.tagger.stop()
|
|
self.assertIsNone(self.tagger._file)
|
|
self.assertIsNone(self.tagger._mtime)
|
|
mock_notify.assert_called()
|
|
|
|
self.assertFalse(self.tagger.is_alive())
|
|
self.assertIsNone(self.tagger._connection)
|
|
mock_connection.close.assert_called()
|
|
|
|
def test_tag_file(self, mock_file: unittest.mock.Mock):
|
|
"""Test asking the thread to tag a file."""
|
|
path = pathlib.Path("/a/b/c.ogg")
|
|
|
|
self.assertIsInstance(self.tagger.ready, threading.Event)
|
|
self.assertIsNone(self.tagger._file)
|
|
self.assertIsNone(self.tagger._tags)
|
|
self.assertIsNone(self.tagger._mtime)
|
|
self.assertTrue(self.tagger.ready.is_set())
|
|
|
|
mock_file.return_value = None
|
|
self.tagger.ready.set()
|
|
self.tagger._tags = 12345
|
|
self.tagger.tag_file(path, None)
|
|
self.assertFalse(self.tagger.ready.is_set())
|
|
self.assertEqual(self.tagger._file, path)
|
|
self.assertIsNone(self.tagger._mtime)
|
|
self.assertIsNone(self.tagger._tags)
|
|
|
|
self.tagger.ready.wait()
|
|
self.assertIsNone(self.tagger._tags)
|
|
mock_file.assert_called_with(pathlib.Path("/a/b/c.ogg"), None)
|
|
|
|
mock_file.return_value = self.make_tags(dict())
|
|
self.tagger.tag_file(path, 12345)
|
|
self.assertEqual(self.tagger._mtime, 12345)
|
|
|
|
self.tagger.ready.wait()
|
|
self.assertIsNotNone(self.tagger._tags)
|
|
mock_file.assert_called_with(self.tagger._file, 12345)
|
|
|
|
def test_get_result(self, mock_file: unittest.mock.Mock):
|
|
"""Test creating a Tags structure after tagging."""
|
|
mock_file.return_value = None
|
|
self.tagger.tag_file(pathlib.Path("/a/b/c.ogg"), None)
|
|
self.assertTupleEqual(self.tagger.get_result(self.sql, self.library),
|
|
(None, None))
|
|
|
|
self.tagger.ready.wait()
|
|
self.assertTupleEqual(self.tagger.get_result(self.sql, self.library),
|
|
(pathlib.Path("/a/b/c.ogg"), None))
|
|
self.assertIsNone(self.tagger._file)
|
|
|
|
mock_file.return_value = self.make_tags(dict())
|
|
self.tagger.tag_file(pathlib.Path("/a/b/c.ogg"), None)
|
|
self.tagger.ready.wait()
|
|
(file, tags) = self.tagger.get_result(self.sql, self.library)
|
|
self.assertEqual(file, pathlib.Path("/a/b/c.ogg"))
|
|
self.assertIsInstance(tags, emmental.db.tagger.Tags)
|
|
self.assertIsNone(self.tagger._file)
|
|
self.assertIsNone(self.tagger._mtime)
|
|
self.assertIsNone(self.tagger._tags)
|
|
|
|
@unittest.mock.patch("emmental.db.connection.Connection.__call__")
|
|
@unittest.mock.patch("musicbrainzngs.get_artist_by_id")
|
|
def test_tag_file_lookup_mbid(self, mock_get_artist: unittest.mock.Mock,
|
|
mock_connection: unittest.mock.Mock,
|
|
mock_file: unittest.mock.Mock):
|
|
"""Test looking up artists with an MBID but no name after tagging."""
|
|
audio_tags = self.make_tags({"albumartist": ["No Artist"],
|
|
"musicbrainz_albumartistid":
|
|
["ab-cd-ef", "gh-ij-kl"]})
|
|
mock_file.return_value = audio_tags
|
|
mock_get_artist.return_value = {"artist": {"name": "Some Artist"}}
|
|
|
|
mock_cursor = unittest.mock.Mock()
|
|
mock_cursor.fetchone = unittest.mock.Mock(return_value=None)
|
|
mock_connection.return_value = mock_cursor
|
|
|
|
self.tagger.tag_file(pathlib.Path("/a/b/c.ogg"), None)
|
|
self.tagger.ready.wait()
|
|
self.assertEqual(audio_tags.artists[0].name, "Some Artist")
|
|
self.assertEqual(audio_tags.artists[1].name, "Some Artist")
|
|
|
|
@unittest.mock.patch("emmental.db.connection.Connection.__call__")
|
|
@unittest.mock.patch("musicbrainzngs.get_artist_by_id")
|
|
def test_tag_file_lookup_sql(self, mock_get_artist: unittest.mock.Mock,
|
|
mock_connection: unittest.mock.Mock,
|
|
mock_file: unittest.mock.Mock):
|
|
"""Test looking up unnamed artists in the database."""
|
|
audio_tags = self.make_tags({"albumartist": ["No Artist"],
|
|
"musicbrainz_albumartistid":
|
|
["ab-cd-ef", "gh-ij-kl"]})
|
|
mock_file.return_value = audio_tags
|
|
mock_get_artist.return_value = {"artist": {"name": None}}
|
|
|
|
mock_row = {"name": "Some Artist"}
|
|
mock_cursor = unittest.mock.Mock()
|
|
mock_cursor.fetchone = unittest.mock.Mock(return_value=mock_row)
|
|
mock_connection.return_value = mock_cursor
|
|
|
|
self.assertIsNone(self.tagger._connection)
|
|
|
|
self.tagger.tag_file(pathlib.Path("/a/b/c.ogg"), None)
|
|
self.tagger.ready.wait()
|
|
self.assertIsInstance(self.tagger._connection,
|
|
emmental.db.connection.Connection)
|
|
self.assertEqual(audio_tags.artists[0].name, "Some Artist")
|
|
self.assertEqual(audio_tags.artists[1].name, "Some Artist")
|