emmental/emmental/__init__.py
Anna Schumaker a485a3806b tracklist: Scroll to the requested Track
And wire this up to not only the Now Playing "jump" signal, but also the
next track pickers so we scroll when tracks are changed.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
2023-05-10 17:42:27 -04:00

302 lines
13 KiB
Python

# Copyright 2022 (c) Anna Schumaker.
"""Set up our Application."""
import musicbrainzngs
import pathlib
from . import gsetup
from . import audio
from . import db
from . import header
from . import mpris2
from . import nowplaying
from . import options
from . import playlist
from . import sidebar
from . import tracklist
from . import window
from gi.repository import GObject
from gi.repository import GLib
from gi.repository import Gio
from gi.repository import Adw
MAJOR_VERSION = 3
MINOR_VERSION = 0
VERSION_STRING = f"Emmental {MAJOR_VERSION}.{MINOR_VERSION}{gsetup.DEBUG_STR}"
class Application(Adw.Application):
"""Our custom Adw.Application."""
db = GObject.Property(type=db.Connection)
factory = GObject.Property(type=playlist.Factory)
mpris = GObject.Property(type=mpris2.Connection)
player = GObject.Property(type=audio.Player)
win = GObject.Property(type=window.Window)
autopause = GObject.Property(type=int, default=-1, minimum=-1, maximum=99)
def __init__(self):
"""Initialize an Application."""
super().__init__(application_id=gsetup.APPLICATION_ID,
resource_base_path=gsetup.RESOURCE_PATH,
flags=Gio.ApplicationFlags.HANDLES_OPEN)
self.add_main_option_entries([options.Version])
def __load_file(self, file: pathlib.Path,
*, gapless: bool = False) -> None:
self.__stop_current_track()
if gapless:
self.player.file = file
else:
self.player.stop()
self.player.file = file
self.player.play()
def __load_path(self, src: GObject.GObject, path: pathlib.Path) -> None:
if (track := self.db.tracks.lookup(path=path)) is not None:
self.__load_track(track)
self.__load_file(path)
def __load_track(self, track: GObject.GObject, *, gapless: bool = False,
rg_auto: str = "track", restart: bool = False) -> None:
self.__load_file(track.path, gapless=gapless)
if restart:
track.restart()
else:
track.start()
self.__set_replaygain(rg_auto=rg_auto)
self.__on_jump()
def __pick_next_track(self, *, user: bool, gapless: bool = False) -> None:
(track, rg_auto, restart) = self.factory.next_track(user=user)
self.__load_track(track, gapless=gapless,
rg_auto=rg_auto, restart=restart)
def __on_jump(self, nowplay: nowplaying.Card | None = None) -> None:
"""Handle a jump event."""
self.win.tracklist.scroll_to_track(self.db.tracks.current_track)
def __on_seek(self, nowplay: nowplaying.Card, newpos: float) -> None:
"""Handle a seek event."""
self.player.seek(newpos)
self.mpris.player.seeked(newpos)
def __seek(self, player: mpris2.player.Player, offset: float) -> None:
self.player.seek(self.player.position + offset)
def __set_position(self, player: mpris2.player.Player,
trackid: str, position: float) -> None:
self.player.seek(position)
def __set_replaygain(self, *args, rg_auto="track") -> None:
enabled = self.db.settings["audio.replaygain.enabled"]
mode = self.db.settings["audio.replaygain.mode"]
mode = rg_auto if mode == "auto" else mode
self.player.set_replaygain(enabled, mode)
def __stop_current_track(self) -> None:
if self.db.tracks.current_track is not None:
self.db.tracks.current_track.stop(self.player.playtime)
def __system_next(self, player: audio.Player, gapless: bool) -> None:
self.player.pause_on_load = self.autopause == 0
if self.autopause >= 0:
self.autopause -= 1
self.__pick_next_track(user=False, gapless=gapless)
def __user_next(self, *args) -> None:
self.__pick_next_track(user=True)
def __user_previous(self, *args) -> None:
self.__load_track(self.factory.previous_track(),
rg_auto="track", restart=True)
def __track_requested(self, factory: playlist.Factory, track,
rg_auto: str, restarted: bool) -> None:
self.__load_track(track, rg_auto=rg_auto, restart=restarted)
def __tracks_table_loaded(self, track_table, param) -> None:
if track_table.current_track is not None:
self.player.file = track_table.current_track.path
self.player.pause()
track_table.current_track.start()
self.__on_jump()
def build_header(self) -> header.Header:
"""Build a new header instance."""
hdr = header.Header(sql=self.db, title=VERSION_STRING)
hdr.bind_property("volume", self.player, "volume")
for (setting, property) in [("audio.volume", "volume"),
("audio.replaygain.enabled", "rg-enabled"),
("audio.replaygain.mode", "rg-mode")]:
self.db.settings.bind_setting(setting, hdr, property)
hdr.connect("notify::rg-enabled", self.__set_replaygain)
hdr.connect("notify::rg-mode", self.__set_replaygain)
hdr.connect("track-requested", self.__load_path)
self.__set_replaygain()
return hdr
def build_now_playing(self) -> nowplaying.Card:
"""Build a new now playing card."""
playing = nowplaying.Card()
playing.bind_property("autopause", self, "autopause",
GObject.BindingFlags.BIDIRECTIONAL)
for prop in ["title", "album", "artist", "album-artist", "playing",
"position", "duration", "artwork", "have-track"]:
self.player.bind_property(prop, playing, prop)
self.db.tracks.bind_property("have-current-track",
playing, "have-db-track")
self.db.tracks.bind_property("current-favorite", playing, "favorite",
GObject.BindingFlags.BIDIRECTIONAL)
self.factory.bind_property("can-go-next", playing, "have-next")
self.factory.bind_property("can-go-previous", playing, "have-previous")
self.db.settings.bind_setting("now-playing.prefer-artist",
playing, "prefer-artist")
playing.connect("jump", self.__on_jump)
playing.connect("play", self.player.play)
playing.connect("pause", self.player.pause)
playing.connect("seek", self.__on_seek)
playing.connect("next", self.__user_next)
playing.connect("previous", self.__user_previous)
return playing
def build_sidebar(self) -> sidebar.Card:
"""Build a new sidebar card."""
side_bar = sidebar.Card(sql=self.db)
self.db.settings.bind_setting("sidebar.artists.show-all", side_bar,
"show-all-artists")
return side_bar
def build_tracklist(self) -> tracklist.Card:
"""Build a new tracklist card."""
track_list = tracklist.Card(sql=self.db)
for column in track_list.columns:
name = column.get_title().lower().replace(" ", "-")
self.db.settings.bind_setting(f"tracklist.{name}.size",
column, "fixed-width")
self.db.settings.bind_setting(f"tracklist.{name}.visible",
column, "visible")
self.factory.bind_property("visible-playlist", track_list, "playlist")
return track_list
def build_window(self) -> window.Window:
"""Build a new window instance."""
win = window.Window(VERSION_STRING,
header=self.build_header(),
now_playing=self.build_now_playing(),
sidebar=self.build_sidebar(),
tracklist=self.build_tracklist())
for (setting, property) in [("window.width", "default-width"),
("window.height", "default-height"),
("now-playing.size", "now-playing-size"),
("sidebar.size", "sidebar-size")]:
self.db.settings.bind_setting(setting, win, property)
return win
def connect_mpris2(self) -> None:
"""Connect the mpris2 properties and functions."""
self.mpris.app.link_property("Fullscreen", self.win, "fullscreened")
self.mpris.app.connect("Raise", self.win.present)
self.mpris.app.connect("Quit", self.win.close)
for tag in ["artist", "album", "album-artist", "album-disc-number",
"title", "track-number", "duration", "file", "artwork"]:
self.player.bind_property(tag, self.mpris.player, tag)
for (prop, mpris_prop) in [("have-track", "CanPlay"),
("have-track", "CanPause"),
("have-track", "CanSeek"),
("status", "PlaybackStatus"),
("position", "Position")]:
self.player.bind_property(prop, self.mpris.player, mpris_prop)
for (prop, mpris_prop) in [("active-shuffle", "Shuffle"),
("active-loop", "LoopStatus")]:
self.factory.bind_property(prop, self.mpris.player, mpris_prop,
GObject.BindingFlags.BIDIRECTIONAL)
for (prop, mpris_prop) in [("can-go-next", "CanGoNext"),
("can-go-previous", "CanGoPrevious")]:
self.factory.bind_property(prop, self.mpris.player, mpris_prop)
self.mpris.player.link_property("Volume", self.win.header, "volume")
self.mpris.player.connect("OpenPath", self.__load_path)
self.mpris.player.connect("Next", self.__user_next)
self.mpris.player.connect("Previous", self.__user_previous)
self.mpris.player.connect("Pause", self.player.pause)
self.mpris.player.connect("Play", self.player.play)
self.mpris.player.connect("PlayPause", self.player.play_pause)
self.mpris.player.connect("Seek", self.__seek)
self.mpris.player.connect("SetPosition", self.__set_position)
self.mpris.player.connect("Stop", self.player.stop)
def connect_playlist_factory(self) -> None:
"""Connect the playlist factory properties."""
self.db.playlists.bind_property("previous",
self.factory, "db-previous")
self.win.sidebar.bind_property("selected-playlist",
self.factory, "db-visible")
self.factory.connect("track-requested", self.__track_requested)
def connect_player(self) -> None:
"""Connect the audio.Player."""
self.player.connect("about-to-finish", self.__system_next, True)
self.player.connect("eos", self.__system_next, False)
def do_handle_local_options(self, opts: GLib.VariantDict) -> int:
"""Handle any command line options."""
if opts.contains("version"):
print(VERSION_STRING)
gsetup.print_versions()
return 0
return -1
def do_startup(self) -> None:
"""Handle the Adw.Application::startup signal."""
Adw.Application.do_startup(self)
self.db = db.Connection()
self.mpris = mpris2.Connection()
self.factory = playlist.Factory(self.db)
self.player = audio.Player()
gsetup.add_style()
musicbrainzngs.set_useragent(f"emmental{gsetup.DEBUG_STR}",
f"{MAJOR_VERSION}.{MINOR_VERSION}")
self.db.tracks.connect("notify::loaded", self.__tracks_table_loaded)
self.db.load()
self.win = self.build_window()
self.add_window(self.win)
self.connect_mpris2()
self.connect_playlist_factory()
self.connect_player()
def do_activate(self) -> None:
"""Handle the Adw.Application::activate signal."""
Adw.Application.do_activate(self)
self.win.present()
def do_open(self, files: list, n_files: int, hint: str) -> None:
"""Play an audio file passed from the command line."""
if n_files > 0:
path = pathlib.Path(files[0].get_path())
self.db.tracks.mark_path_active(path)
self.__load_path(None, path)
self.activate()
def do_shutdown(self) -> None:
"""Handle the Adw.Application::shutdown signal."""
Adw.Application.do_shutdown(self)
if self.player is not None:
self.player.shutdown()
self.player = None
if self.win is not None:
self.win.close()
self.win = None
if self.mpris is not None:
self.mpris.disconnect()
self.mpris = None
if self.db is not None:
self.db.close()
self.db = None