90 lines
3.7 KiB
Python
90 lines
3.7 KiB
Python
# Copyright 2022 (c) Anna Schumaker.
|
|
"""Our generic dbus object."""
|
|
import pathlib
|
|
from gi.repository import GObject
|
|
from gi.repository import GLib
|
|
from gi.repository import Gio
|
|
|
|
OBJECT_PATH = "/org/mpris/MediaPlayer2"
|
|
|
|
|
|
class Object(GObject.GObject):
|
|
"""A generic dbus object."""
|
|
|
|
dbus = GObject.Property(type=Gio.DBusConnection)
|
|
nodeinfo = GObject.Property(type=Gio.DBusNodeInfo)
|
|
interface = GObject.Property(type=Gio.DBusInterfaceInfo)
|
|
registration = GObject.Property(type=int)
|
|
|
|
def __init__(self, xml: pathlib.Path = None):
|
|
"""Initialize a dbus Object."""
|
|
super().__init__()
|
|
if xml and xml.is_file():
|
|
with open(xml, 'r') as f:
|
|
self.nodeinfo = Gio.DBusNodeInfo.new_for_xml(f.read())
|
|
self.interface = self.nodeinfo.interfaces[0]
|
|
self.connect("notify", self.__on_notify)
|
|
|
|
def __on_notify(self, object: GObject.GObject, param) -> None:
|
|
self.do_notify(param.name)
|
|
|
|
def do_notify(self, property: str) -> None:
|
|
"""Handle a property value changing."""
|
|
|
|
def link_property(self, property: str, object: GObject.GObject,
|
|
object_property: str) -> None:
|
|
"""Link a dbus property to the object."""
|
|
self.bind_property(property, object, object_property,
|
|
GObject.BindingFlags.BIDIRECTIONAL)
|
|
|
|
def properties_changed(self, changed: dict) -> None:
|
|
"""Emit the org.freedesktop.DBus.PropertiesChanged signal."""
|
|
if self.dbus is None:
|
|
return
|
|
|
|
args = GLib.Variant.new_tuple(GLib.Variant("s", self.interface.name),
|
|
GLib.Variant("a{sv}", changed),
|
|
GLib.Variant("as", []))
|
|
self.dbus.emit_signal(None, OBJECT_PATH,
|
|
"org.freedesktop.DBus.Properties",
|
|
"PropertiesChanged", args)
|
|
|
|
def register(self, dbus: Gio.DBusConnection):
|
|
"""Register this dbus Object."""
|
|
self.registration = dbus.register_object(OBJECT_PATH, self.interface,
|
|
self.__method_call,
|
|
self.__get_property,
|
|
self.__set_property)
|
|
|
|
def unregister(self, dbus: Gio.DBusConnection):
|
|
"""Unregister this Object from the bus."""
|
|
dbus.unregister_object(self.registration)
|
|
self.registration = 0
|
|
|
|
def __method_call(self, dbus: Gio.DBusConnection, sender: str, object: str,
|
|
interface: str, method: str, parameters: GLib.Variant,
|
|
invocation: Gio.DBusMethodInvocation) -> None:
|
|
if object != OBJECT_PATH or interface != self.interface.name:
|
|
return None
|
|
|
|
self.emit(method, *parameters.unpack())
|
|
invocation.return_value(GLib.Variant.new_tuple())
|
|
|
|
def __get_property(self, dbus: Gio.DBusConnection, sender: str,
|
|
object: str, interface: str,
|
|
property: str) -> GLib.Variant | None:
|
|
if object != OBJECT_PATH or interface != self.interface.name:
|
|
return None
|
|
|
|
if property_info := self.interface.lookup_property(property):
|
|
return GLib.Variant(property_info.signature,
|
|
self.get_property(property))
|
|
|
|
def __set_property(self, dbus: Gio.DBusConnection, sender: str,
|
|
object: str, interface: str, property: str,
|
|
value: GLib.Variant) -> bool:
|
|
if object != OBJECT_PATH or interface != self.interface.name:
|
|
return None
|
|
self.set_property(property, value.unpack())
|
|
return True
|