python类bus_get_sync()的实例源码

dbus_client.py 文件源码 项目:rauc-hawkbit 作者: rauc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self):
        self.logger = logging.getLogger('rauc_hawkbit')
        self.dbus_events = asyncio.Queue()
        loop = asyncio.get_event_loop()
        # handle dbus events in async way
        self.dbus_event_task = loop.create_task(self.handle_dbus_event())
        # holds active subscriptions
        self.signal_subscriptions = []
        # ({interface}, {signal}): {callback}
        self.signal_callbacks = {}
        # ({interface}, {property}): {callback}
        self.property_callbacks = {}

        self.system_bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)

        # always subscribe to property changes by default
        self.new_signal_subscription('org.freedesktop.DBus.Properties',
                                     'PropertiesChanged',
                                     self.property_changed_callback)
base.py 文件源码 项目:eos-data-distribution 作者: endlessm 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, name, dbus_name, skeleton):
        self._cb_registery = dict()
        self._obj_registery = dict()
        self._dbus_name = build_dbus_name(dbus_name, name)
        self._interface_skeleton_object = skeleton

        self.con = Gio.bus_get_sync(BUS_TYPE, None)

        dbus_path = BASE_DBUS_PATH # build_dbus_path(name)

        logger.debug('Producer ObjectManagerServer dbus: %s, %s', dbus_path, self._dbus_name)
        self._object_manager = Gio.DBusObjectManagerServer(object_path=dbus_path)
        self._object_manager.set_connection(self.con)

        Gio.bus_own_name_on_connection(
            self.con, self._dbus_name, Gio.BusNameOwnerFlags.NONE, None, None)
mpris.py 文件源码 项目:cozy 作者: geigi 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, app, ui):
        self.__app = app
        self.__ui = ui
        self.__rating = None
        self.__cozy_id = 0
        self.__metadata = {"mpris:trackid": GLib.Variant(
            "o",
            "/org/mpris/MediaPlayer2/TrackList/NoTrack")}
        self.__track_id = self.__get_media_id(0)
        self.__bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
        Gio.bus_own_name_on_connection(self.__bus,
                                       self.__MPRIS_COZY,
                                       Gio.BusNameOwnerFlags.NONE,
                                       None,
                                       None)
        Server.__init__(self, self.__bus, self.__MPRIS_PATH)

        bus = get_gst_bus()
        bus.connect("message", self.__on_gst_message)

        #Lp().player.connect("current-changed", self.__on_current_changed)
        #Lp().player.connect("seeked", self.__on_seeked)
        #Lp().player.connect("status-changed", self.__on_status_changed)
        #Lp().player.connect("volume-changed", self.__on_volume_changed)
dbus_service.py 文件源码 项目:eos-data-distribution 作者: endlessm 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        self.con = Gio.bus_get_sync(Gio.BusType.SESSION, None)

        Gio.bus_own_name_on_connection(
            self.con, 'com.endlessm.EknSubscriptionsDownloader', Gio.BusNameOwnerFlags.NONE, None, None)

        self.con.register_object(
            object_path='/com/endlessm/EknSubscriptionsDownloader',
                                 interface_info=IFACE_INFO, method_call_closure=self._on_method_call)

        self._consumer = Consumer(name=SUBSCRIPTIONS_INSTALLED)
        self._consumer.connect('data', self._on_data)

        # FIXME: Port to GNotification instead
        self._notification = Notify.Notification.new("", "")
display-config.py 文件源码 项目:display-config 作者: xytovl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self):
        self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
        self.proxy = Gio.DBusProxy.new_sync(
            self.bus, Gio.DBusProxyFlags.NONE, None,
            'org.gnome.Mutter.DisplayConfig',
            '/org/gnome/Mutter/DisplayConfig',
            'org.gnome.Mutter.DisplayConfig', None)
        self.get_resources()


问题


面经


文章

微信
公众号

扫码关注公众号