def __init__(self):
self.logger = logging.getLogger('rauc_hawkbit')
self.dbus_events = asyncio.Queue()
loop = asyncio.get_event_loop()
# handle dbus events in async way
self.dbus_event_task = loop.create_task(self.handle_dbus_event())
# holds active subscriptions
self.signal_subscriptions = []
# ({interface}, {signal}): {callback}
self.signal_callbacks = {}
# ({interface}, {property}): {callback}
self.property_callbacks = {}
self.system_bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
# always subscribe to property changes by default
self.new_signal_subscription('org.freedesktop.DBus.Properties',
'PropertiesChanged',
self.property_changed_callback)
python类bus_get_sync()的实例源码
def __init__(self, name, dbus_name, skeleton):
self._cb_registery = dict()
self._obj_registery = dict()
self._dbus_name = build_dbus_name(dbus_name, name)
self._interface_skeleton_object = skeleton
self.con = Gio.bus_get_sync(BUS_TYPE, None)
dbus_path = BASE_DBUS_PATH # build_dbus_path(name)
logger.debug('Producer ObjectManagerServer dbus: %s, %s', dbus_path, self._dbus_name)
self._object_manager = Gio.DBusObjectManagerServer(object_path=dbus_path)
self._object_manager.set_connection(self.con)
Gio.bus_own_name_on_connection(
self.con, self._dbus_name, Gio.BusNameOwnerFlags.NONE, None, None)
def __init__(self, app, ui):
self.__app = app
self.__ui = ui
self.__rating = None
self.__cozy_id = 0
self.__metadata = {"mpris:trackid": GLib.Variant(
"o",
"/org/mpris/MediaPlayer2/TrackList/NoTrack")}
self.__track_id = self.__get_media_id(0)
self.__bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
Gio.bus_own_name_on_connection(self.__bus,
self.__MPRIS_COZY,
Gio.BusNameOwnerFlags.NONE,
None,
None)
Server.__init__(self, self.__bus, self.__MPRIS_PATH)
bus = get_gst_bus()
bus.connect("message", self.__on_gst_message)
#Lp().player.connect("current-changed", self.__on_current_changed)
#Lp().player.connect("seeked", self.__on_seeked)
#Lp().player.connect("status-changed", self.__on_status_changed)
#Lp().player.connect("volume-changed", self.__on_volume_changed)
def __init__(self):
self.con = Gio.bus_get_sync(Gio.BusType.SESSION, None)
Gio.bus_own_name_on_connection(
self.con, 'com.endlessm.EknSubscriptionsDownloader', Gio.BusNameOwnerFlags.NONE, None, None)
self.con.register_object(
object_path='/com/endlessm/EknSubscriptionsDownloader',
interface_info=IFACE_INFO, method_call_closure=self._on_method_call)
self._consumer = Consumer(name=SUBSCRIPTIONS_INSTALLED)
self._consumer.connect('data', self._on_data)
# FIXME: Port to GNotification instead
self._notification = Notify.Notification.new("", "")
def __init__(self):
self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
self.proxy = Gio.DBusProxy.new_sync(
self.bus, Gio.DBusProxyFlags.NONE, None,
'org.gnome.Mutter.DisplayConfig',
'/org/gnome/Mutter/DisplayConfig',
'org.gnome.Mutter.DisplayConfig', None)
self.get_resources()