279 lines
10 KiB
Python
279 lines
10 KiB
Python
# Copyright 2022 (c) Anna Schumaker
|
|
"""A wrapper around Mutagen to help us read tags."""
|
|
import emmental.audio.tagger
|
|
import musicbrainzngs
|
|
import pathlib
|
|
import threading
|
|
from gi.repository import GObject
|
|
from .. import audio
|
|
from . import albums
|
|
from . import artists
|
|
from . import connection
|
|
from . import decades
|
|
from . import media
|
|
from . import genres
|
|
from . import playlist
|
|
from . import tracks
|
|
from . import years
|
|
|
|
|
|
class Tags:
|
|
"""Translate the audio.tagger._Tags object into Playlists."""
|
|
|
|
def __init__(self, db: GObject.TYPE_PYOBJECT,
|
|
raw_tags: audio.tagger._Tags,
|
|
library: playlist.Playlist):
|
|
"""Initialize the Tags object."""
|
|
self.db = db
|
|
|
|
with self.db:
|
|
self.album = self.get_album(raw_tags.album)
|
|
self.album_artists = [self.get_artist(artist)
|
|
for artist in raw_tags.album.artists]
|
|
self.artists = [self.get_artist(artist)
|
|
for artist in raw_tags.artists]
|
|
self.decade = self.get_decade(raw_tags.year)
|
|
self.genres = list(filter(None, [self.get_genre(genre)
|
|
for genre in raw_tags.genres]))
|
|
self.medium = self.get_medium(raw_tags.medium)
|
|
self.year = self.get_year(raw_tags.year)
|
|
|
|
self.track = self.get_track(library, raw_tags.file, raw_tags.track)
|
|
|
|
self.__update_album_artists()
|
|
|
|
def __update_album_artists(self) -> None:
|
|
if self.album is not None:
|
|
old = set(self.album.get_artists())
|
|
new = set(self.album_artists)
|
|
|
|
for artist in old - new:
|
|
artist.remove_album(self.album)
|
|
for artist in new - old:
|
|
artist.add_album(self.album)
|
|
|
|
def __update_track(self, track: tracks.Track,
|
|
raw_track: audio.tagger._Track) -> None:
|
|
orig_year = track.get_year()
|
|
orig_decade = orig_year.parent
|
|
orig_genres = set(track.get_genres())
|
|
orig_medium = track.get_medium()
|
|
orig_album = orig_medium.get_album()
|
|
orig_artists = set(track.get_artists())
|
|
|
|
track.update_properties(mediumid=self.medium.mediumid,
|
|
year=self.year.year,
|
|
title=raw_track.title,
|
|
number=raw_track.number,
|
|
length=raw_track.length,
|
|
artist=raw_track.artist,
|
|
mbid=raw_track.mbid,
|
|
mtime=raw_track.mtime)
|
|
|
|
self.__update_track_playlist_set(track, orig_artists,
|
|
set(self.artists))
|
|
self.__update_track_playlist_set(track, orig_genres, set(self.genres))
|
|
|
|
self.__update_track_playlist(track, orig_album, self.album)
|
|
self.__update_track_playlist(track, orig_medium, self.medium)
|
|
self.__update_track_playlist(track, orig_decade, self.decade)
|
|
self.__update_track_playlist(track, orig_year, self.year)
|
|
|
|
def __update_track_playlist(self, track: tracks.Track,
|
|
orig: playlist.Playlist,
|
|
new: playlist.Playlist):
|
|
if orig != new:
|
|
orig.remove_track(track, idle=True)
|
|
new.add_track(track, idle=True)
|
|
|
|
def __update_track_playlist_set(self, track: tracks.Track,
|
|
orig: set[playlist.Playlist],
|
|
new: set[playlist.Playlist]):
|
|
for plist in orig - new:
|
|
plist.remove_track(track, idle=True)
|
|
for plist in new - orig:
|
|
plist.add_track(track, idle=True)
|
|
|
|
def get_album(self, raw_album: audio.tagger._Album) -> albums.Album | None:
|
|
"""Convert the raw album into an Album object."""
|
|
if raw_album.name == "":
|
|
return None
|
|
|
|
cover = raw_album.cover if raw_album.cover.is_file() else None
|
|
album = self.db.albums.lookup(raw_album.name, raw_album.artist,
|
|
raw_album.release, mbid=raw_album.mbid)
|
|
if album is not None:
|
|
if album.cover != cover:
|
|
album.cover = cover
|
|
return album
|
|
return self.db.albums.create(raw_album.name, raw_album.artist,
|
|
raw_album.release, mbid=raw_album.mbid,
|
|
cover=cover)
|
|
|
|
def get_artist(self, raw_artist: audio.tagger._Artist) \
|
|
-> artists.Artist | None:
|
|
"""Convert the raw artist into an Artist object."""
|
|
artist = self.db.artists.lookup(raw_artist.name, mbid=raw_artist.mbid)
|
|
if artist is not None:
|
|
return artist
|
|
return self.db.artists.create(raw_artist.name, mbid=raw_artist.mbid)
|
|
|
|
def get_decade(self, raw_year: int | None) -> decades.Decade | None:
|
|
"""Convert the raw year into a Decade object."""
|
|
if raw_year:
|
|
decade = self.db.decades.lookup(raw_year)
|
|
return decade if decade else self.db.decades.create(raw_year)
|
|
|
|
def get_genre(self, raw_genre: str) -> genres.Genre:
|
|
"""Convert the raw genre names into Genre objects."""
|
|
genre = self.db.genres.lookup(raw_genre)
|
|
return genre if genre else self.db.genres.create(raw_genre)
|
|
|
|
def get_medium(self, raw_medium: audio.tagger._Medium) \
|
|
-> media.Medium | None:
|
|
"""Convert the raw medium into a Medium object."""
|
|
if self.album is None:
|
|
return None
|
|
|
|
medium = self.db.media.lookup(self.album, number=raw_medium.number,
|
|
type=raw_medium.type)
|
|
if medium is not None:
|
|
medium.rename(raw_medium.name)
|
|
return medium
|
|
return self.db.media.create(self.album, raw_medium.name,
|
|
number=raw_medium.number,
|
|
type=raw_medium.type)
|
|
|
|
def get_track(self, library: playlist.Playlist, filepath: pathlib.Path,
|
|
raw_track: audio.tagger._Track) -> tracks.Track | None:
|
|
"""Convert the raw track into a Track object."""
|
|
if self.medium is None or self.year is None:
|
|
return None
|
|
|
|
track = self.db.tracks.lookup(library, path=filepath)
|
|
if track is not None:
|
|
self.__update_track(track, raw_track)
|
|
return track
|
|
|
|
track = self.db.tracks.create(library, filepath, self.medium,
|
|
self.year, title=raw_track.title,
|
|
number=raw_track.number,
|
|
length=raw_track.length,
|
|
artist=raw_track.artist,
|
|
mbid=raw_track.mbid,
|
|
mtime=raw_track.mtime)
|
|
|
|
for plist in [self.db.playlists.collection,
|
|
self.db.playlists.new_tracks,
|
|
self.db.playlists.unplayed,
|
|
self.album, *self.artists, self.medium,
|
|
*self.genres, self.decade, self.year, library]:
|
|
plist.add_track(track, idle=True)
|
|
return track
|
|
|
|
def get_year(self, raw_year: int | None) -> years.Year | None:
|
|
"""Convert the raw year into a Year object."""
|
|
if raw_year:
|
|
year = self.db.years.lookup(raw_year)
|
|
return year if year else self.db.years.create(raw_year)
|
|
|
|
|
|
class Thread(threading.Thread):
|
|
"""A thread for tagging files without blocking the UI."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the Tagger Thread."""
|
|
super().__init__()
|
|
self.ready = threading.Event()
|
|
|
|
self._connection = None
|
|
self._condition = threading.Condition()
|
|
self._file = None
|
|
self._mtime = None
|
|
self._tags = None
|
|
self.start()
|
|
|
|
def __close_connection(self) -> None:
|
|
if self._connection:
|
|
self._connection.close()
|
|
self._connection = None
|
|
|
|
def __get_connection(self) -> connection.Connection:
|
|
if not self._connection:
|
|
self._connection = connection.Connection()
|
|
return self._connection
|
|
|
|
def __check_artist(self, artist: audio.tagger._Artist) -> None:
|
|
if artist.name is None and len(artist.mbid) > 0:
|
|
sql = self.__get_connection()
|
|
cur = sql("SELECT name FROM artists WHERE mbid=?", artist.mbid)
|
|
if row := cur.fetchone():
|
|
artist.name = row["name"]
|
|
else:
|
|
mb_res = musicbrainzngs.get_artist_by_id(artist.mbid)
|
|
artist.name = mb_res["artist"]["name"]
|
|
|
|
def get_result(self, db: GObject.TYPE_PYOBJECT,
|
|
library: playlist.Playlist) \
|
|
-> tuple[pathlib.Path | None, Tags | None]:
|
|
"""Return the resulting Tags structure."""
|
|
with self._condition:
|
|
if not self.ready.is_set():
|
|
return (None, None)
|
|
|
|
tags = Tags(db, self._tags, library) if self._tags else None
|
|
res = (self._file, tags)
|
|
self._file = None
|
|
self._tags = None
|
|
return res
|
|
|
|
def run(self) -> None:
|
|
"""Sleep until we have work to do."""
|
|
with self._condition:
|
|
self.ready.set()
|
|
|
|
while self._condition.wait():
|
|
if self._file is None:
|
|
break
|
|
|
|
tags = emmental.audio.tagger.tag_file(self._file, self._mtime)
|
|
if tags is not None:
|
|
for artist in tags.artists:
|
|
self.__check_artist(artist)
|
|
|
|
self._tags = tags
|
|
self.ready.set()
|
|
|
|
self.__close_connection()
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the thread."""
|
|
with self._condition:
|
|
self._file = None
|
|
self._mtime = None
|
|
self._condition.notify()
|
|
self.join()
|
|
|
|
def tag_file(self, file: pathlib.Path, mtime: float | None) -> None:
|
|
"""Tag a file."""
|
|
with self._condition:
|
|
self.ready.clear()
|
|
self._file = file
|
|
self._mtime = mtime
|
|
self._tags = None
|
|
self._condition.notify()
|
|
|
|
|
|
def untag_track(db: GObject.TYPE_PYOBJECT, track: tracks.Track) -> None:
|
|
"""Untag a Track."""
|
|
medium = track.get_medium()
|
|
year = track.get_year()
|
|
|
|
playlists = [plist for plist in db.playlists.store]
|
|
playlists.extend([medium, medium.get_album()])
|
|
playlists.extend(track.get_artists())
|
|
playlists.extend([year, year.parent, track.get_library()])
|
|
|
|
for plist in playlists:
|
|
plist.remove_track(track)
|