emmental/emmental/db/tagger.py

243 lines
9.6 KiB
Python

# Copyright 2022 (c) Anna Schumaker
"""A wrapper around Mutagen to help us read tags."""
import emmental.audio.tagger
import musicbrainzngs
import pathlib
from gi.repository import GObject
from .. import audio
from .. import thread
from . import albums
from . import artists
from . import connection
from . import decades
from . import media
from . import genres
from . import playlist
from . import tracks
from . import years
class Tags:
"""Translate the audio.tagger._Tags object into Playlists."""
def __init__(self, db: GObject.TYPE_PYOBJECT,
raw_tags: audio.tagger._Tags,
library: playlist.Playlist):
"""Initialize the Tags object."""
self.db = db
with self.db:
self.album = self.get_album(raw_tags.album)
self.album_artists = [self.get_artist(artist)
for artist in raw_tags.album.artists]
self.artists = [self.get_artist(artist)
for artist in raw_tags.artists]
self.decade = self.get_decade(raw_tags.year)
self.genres = list(filter(None, [self.get_genre(genre)
for genre in raw_tags.genres]))
self.medium = self.get_medium(raw_tags.medium)
self.year = self.get_year(raw_tags.year)
self.track = self.get_track(library, raw_tags.file, raw_tags.track)
self.__update_album_artists()
def __update_album_artists(self) -> None:
if self.album is not None:
old = set(self.album.get_artists())
new = set(self.album_artists)
for artist in old - new:
artist.remove_album(self.album)
for artist in new - old:
artist.add_album(self.album)
def __update_track(self, track: tracks.Track,
raw_track: audio.tagger._Track) -> None:
orig_year = track.get_year()
orig_decade = orig_year.parent
orig_genres = set(track.get_genres())
orig_medium = track.get_medium()
orig_album = orig_medium.get_album()
orig_artists = set(track.get_artists())
track.update_properties(mediumid=self.medium.mediumid,
year=self.year.year,
title=raw_track.title,
number=raw_track.number,
length=raw_track.length,
artist=raw_track.artist,
mbid=raw_track.mbid,
mtime=raw_track.mtime)
self.__update_track_playlist_set(track, orig_artists,
set(self.artists))
self.__update_track_playlist_set(track, orig_genres, set(self.genres))
self.__update_track_playlist(track, orig_album, self.album)
self.__update_track_playlist(track, orig_medium, self.medium)
self.__update_track_playlist(track, orig_decade, self.decade)
self.__update_track_playlist(track, orig_year, self.year)
def __update_track_playlist(self, track: tracks.Track,
orig: playlist.Playlist,
new: playlist.Playlist):
if orig != new:
orig.remove_track(track, idle=True)
new.add_track(track, idle=True)
def __update_track_playlist_set(self, track: tracks.Track,
orig: set[playlist.Playlist],
new: set[playlist.Playlist]):
for plist in orig - new:
plist.remove_track(track, idle=True)
for plist in new - orig:
plist.add_track(track, idle=True)
def get_album(self, raw_album: audio.tagger._Album) -> albums.Album | None:
"""Convert the raw album into an Album object."""
if raw_album.name == "":
return None
cover = raw_album.cover if raw_album.cover.is_file() else None
album = self.db.albums.lookup(raw_album.name, raw_album.artist,
raw_album.release, mbid=raw_album.mbid)
if album is not None:
if album.cover != cover:
album.cover = cover
return album
return self.db.albums.create(raw_album.name, raw_album.artist,
raw_album.release, mbid=raw_album.mbid,
cover=cover)
def get_artist(self, raw_artist: audio.tagger._Artist) \
-> artists.Artist | None:
"""Convert the raw artist into an Artist object."""
artist = self.db.artists.lookup(raw_artist.name, mbid=raw_artist.mbid)
if artist is not None:
return artist
return self.db.artists.create(raw_artist.name, mbid=raw_artist.mbid)
def get_decade(self, raw_year: int | None) -> decades.Decade | None:
"""Convert the raw year into a Decade object."""
if raw_year:
decade = self.db.decades.lookup(raw_year)
return decade if decade else self.db.decades.create(raw_year)
def get_genre(self, raw_genre: str) -> genres.Genre:
"""Convert the raw genre names into Genre objects."""
genre = self.db.genres.lookup(raw_genre)
return genre if genre else self.db.genres.create(raw_genre)
def get_medium(self, raw_medium: audio.tagger._Medium) \
-> media.Medium | None:
"""Convert the raw medium into a Medium object."""
if self.album is None:
return None
medium = self.db.media.lookup(self.album, number=raw_medium.number,
type=raw_medium.type)
if medium is not None:
medium.rename(raw_medium.name)
return medium
return self.db.media.create(self.album, raw_medium.name,
number=raw_medium.number,
type=raw_medium.type)
def get_track(self, library: playlist.Playlist, filepath: pathlib.Path,
raw_track: audio.tagger._Track) -> tracks.Track | None:
"""Convert the raw track into a Track object."""
if self.medium is None or self.year is None:
return None
track = self.db.tracks.lookup(library, path=filepath)
if track is not None:
self.__update_track(track, raw_track)
return track
track = self.db.tracks.create(library, filepath, self.medium,
self.year, title=raw_track.title,
number=raw_track.number,
length=raw_track.length,
artist=raw_track.artist,
mbid=raw_track.mbid,
mtime=raw_track.mtime)
for plist in [self.db.playlists.collection,
self.db.playlists.new_tracks,
self.db.playlists.unplayed,
self.album, *self.artists, self.medium,
*self.genres, self.decade, self.year, library]:
plist.add_track(track, idle=True)
return track
def get_year(self, raw_year: int | None) -> years.Year | None:
"""Convert the raw year into a Year object."""
if raw_year:
year = self.db.years.lookup(raw_year)
return year if year else self.db.years.create(raw_year)
class Thread(thread.Thread):
"""A thread for tagging files without blocking the UI."""
def __init__(self):
"""Initialize the Tagger Thread."""
super().__init__()
self._connection = None
def __get_connection(self) -> connection.Connection:
if not self._connection:
self._connection = connection.Connection()
return self._connection
def __check_artist(self, artist: audio.tagger._Artist) -> None:
if artist.name is None and len(artist.mbid) > 0:
sql = self.__get_connection()
cur = sql("SELECT name FROM artists WHERE mbid=?", artist.mbid)
if row := cur.fetchone():
artist.name = row["name"]
else:
mb_res = musicbrainzngs.get_artist_by_id(artist.mbid)
artist.name = mb_res["artist"]["name"]
def do_get_result(self, result: thread.Data, db: GObject.TYPE_PYOBJECT,
library: playlist.Playlist) -> tuple:
"""Return the resulting Tags structure."""
tags = None if result.tags is None else Tags(db, result.tags, library)
return (result.path, tags)
def do_run_task(self, task: thread.Data) -> None:
"""Tag a file."""
tags = emmental.audio.tagger.tag_file(task.path, task.mtime)
if tags is not None:
for artist in tags.artists:
self.__check_artist(artist)
self.set_result(path=task.path, tags=tags)
def do_stop(self) -> None:
"""Close the connection before stopping."""
if self._connection:
self._connection.close()
self._connection = None
def tag_file(self, path: pathlib.Path,
*, mtime: float | None = None) -> None:
"""Tag a file."""
self.set_task(path=path, mtime=mtime)
def untag_track(db: GObject.TYPE_PYOBJECT, track: tracks.Track) -> None:
"""Untag a Track."""
medium = track.get_medium()
year = track.get_year()
playlists = [plist for plist in db.playlists.store]
playlists.extend([medium, medium.get_album()])
playlists.extend(track.get_artists())
playlists.extend([year, year.parent, track.get_library()])
for plist in playlists:
plist.remove_track(track)