186 lines
6.6 KiB
Python
186 lines
6.6 KiB
Python
# Copyright 2022 (c) Anna Schumaker
|
|
"""A wrapper around Mutagen to help us read tags."""
|
|
import emmental.audio.tagger
|
|
import musicbrainzngs
|
|
import pathlib
|
|
import threading
|
|
from gi.repository import GObject
|
|
from .. import audio
|
|
from . import albums
|
|
from . import artists
|
|
from . import connection
|
|
from . import decades
|
|
from . import media
|
|
from . import genres
|
|
from . import years
|
|
|
|
|
|
class Tags:
|
|
"""Translate the audio.tagger._Tags object into Playlists."""
|
|
|
|
def __init__(self, db: GObject.TYPE_PYOBJECT,
|
|
raw_tags: audio.tagger._Tags):
|
|
"""Initialize the Tags object."""
|
|
self.db = db
|
|
|
|
with self.db:
|
|
self.album = self.get_album(raw_tags.album)
|
|
self.album_artists = [self.get_artist(artist)
|
|
for artist in raw_tags.album.artists]
|
|
self.artists = [self.get_artist(artist)
|
|
for artist in raw_tags.artists]
|
|
self.decade = self.get_decade(raw_tags.year)
|
|
self.genres = list(filter(None, [self.get_genre(genre)
|
|
for genre in raw_tags.genres]))
|
|
self.medium = self.get_medium(raw_tags.medium)
|
|
self.year = self.get_year(raw_tags.year)
|
|
|
|
self.__update_album_artists()
|
|
|
|
def __update_album_artists(self) -> None:
|
|
if self.album is not None:
|
|
old = set(self.album.get_artists())
|
|
new = set(self.album_artists)
|
|
|
|
for artist in old - new:
|
|
artist.remove_album(self.album)
|
|
for artist in new - old:
|
|
artist.add_album(self.album)
|
|
|
|
def get_album(self, raw_album: audio.tagger._Album) -> albums.Album | None:
|
|
"""Convert the raw album into an Album object."""
|
|
if raw_album.name == "":
|
|
return None
|
|
|
|
cover = raw_album.cover if raw_album.cover.is_file() else None
|
|
album = self.db.albums.lookup(raw_album.name, raw_album.artist,
|
|
raw_album.release, mbid=raw_album.mbid)
|
|
if album is not None:
|
|
if album.cover != cover:
|
|
album.cover = cover
|
|
return album
|
|
return self.db.albums.create(raw_album.name, raw_album.artist,
|
|
raw_album.release, mbid=raw_album.mbid,
|
|
cover=cover)
|
|
|
|
def get_artist(self, raw_artist: audio.tagger._Artist) \
|
|
-> artists.Artist | None:
|
|
"""Convert the raw artist into an Artist object."""
|
|
artist = self.db.artists.lookup(raw_artist.name, mbid=raw_artist.mbid)
|
|
if artist is not None:
|
|
return artist
|
|
return self.db.artists.create(raw_artist.name, mbid=raw_artist.mbid)
|
|
|
|
def get_decade(self, raw_year: int | None) -> decades.Decade | None:
|
|
"""Convert the raw year into a Decade object."""
|
|
if raw_year:
|
|
decade = self.db.decades.lookup(raw_year)
|
|
return decade if decade else self.db.decades.create(raw_year)
|
|
|
|
def get_genre(self, raw_genre: str) -> genres.Genre:
|
|
"""Convert the raw genre names into Genre objects."""
|
|
genre = self.db.genres.lookup(raw_genre)
|
|
return genre if genre else self.db.genres.create(raw_genre)
|
|
|
|
def get_medium(self, raw_medium: audio.tagger._Medium) \
|
|
-> media.Medium | None:
|
|
"""Convert the raw medium into a Medium object."""
|
|
if self.album is None:
|
|
return None
|
|
|
|
medium = self.db.media.lookup(self.album, number=raw_medium.number,
|
|
type=raw_medium.type)
|
|
if medium is not None:
|
|
medium.rename(raw_medium.name)
|
|
return medium
|
|
return self.db.media.create(self.album, raw_medium.name,
|
|
number=raw_medium.number,
|
|
type=raw_medium.type)
|
|
|
|
def get_year(self, raw_year: int | None) -> years.Year | None:
|
|
"""Convert the raw year into a Year object."""
|
|
if raw_year:
|
|
year = self.db.years.lookup(raw_year)
|
|
return year if year else self.db.years.create(raw_year)
|
|
|
|
|
|
class Thread(threading.Thread):
|
|
"""A thread for tagging files without blocking the UI."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the Tagger Thread."""
|
|
super().__init__()
|
|
self.ready = threading.Event()
|
|
|
|
self._connection = None
|
|
self._condition = threading.Condition()
|
|
self._file = None
|
|
self._tags = None
|
|
self.start()
|
|
|
|
def __close_connection(self) -> None:
|
|
if self._connection:
|
|
self._connection.close()
|
|
self._connection = None
|
|
|
|
def __get_connection(self) -> connection.Connection:
|
|
if not self._connection:
|
|
self._connection = connection.Connection()
|
|
return self._connection
|
|
|
|
def __check_artist(self, artist: audio.tagger._Artist) -> None:
|
|
if artist.name is None and len(artist.mbid) > 0:
|
|
sql = self.__get_connection()
|
|
cur = sql("SELECT name FROM artists WHERE mbid=?", artist.mbid)
|
|
if row := cur.fetchone():
|
|
artist.name = row["name"]
|
|
else:
|
|
mb_res = musicbrainzngs.get_artist_by_id(artist.mbid)
|
|
artist.name = mb_res["artist"]["name"]
|
|
|
|
def get_result(self, db: GObject.TYPE_PYOBJECT) \
|
|
-> tuple[pathlib.Path | None, Tags | None]:
|
|
"""Return the resulting Tags structure."""
|
|
with self._condition:
|
|
if not self.ready.is_set():
|
|
return (None, None)
|
|
|
|
tags = Tags(db, self._tags) if self._tags else None
|
|
res = (self._file, tags)
|
|
self._file = None
|
|
self._tags = None
|
|
return res
|
|
|
|
def run(self) -> None:
|
|
"""Sleep until we have work to do."""
|
|
with self._condition:
|
|
self.ready.set()
|
|
|
|
while self._condition.wait():
|
|
if self._file is None:
|
|
break
|
|
|
|
if tags := emmental.audio.tagger.tag_file(self._file, None):
|
|
for artist in tags.artists:
|
|
self.__check_artist(artist)
|
|
|
|
self._tags = tags
|
|
self.ready.set()
|
|
|
|
self.__close_connection()
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the thread."""
|
|
with self._condition:
|
|
self._file = None
|
|
self._condition.notify()
|
|
self.join()
|
|
|
|
def tag_file(self, file: pathlib.Path) -> None:
|
|
"""Tag a file."""
|
|
with self._condition:
|
|
self.ready.clear()
|
|
self._file = file
|
|
self._tags = None
|
|
self._condition.notify()
|