python类MainLoop()的实例源码

utils.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def clear_token_from_ubuntu_sso_sync(appname):
    """ send a dbus signal to the com.ubuntu.sso service to clear
        the credentials for the given appname, e.g. _("Ubuntu Software Center")
        and wait for it to finish (or 2s)
    """
    from ubuntu_sso import (
        DBUS_BUS_NAME,
        DBUS_CREDENTIALS_IFACE,
        DBUS_CREDENTIALS_PATH,
        )
    # clean
    loop = GObject.MainLoop()
    bus = dbus.SessionBus()
    obj = bus.get_object(bus_name=DBUS_BUS_NAME,
                         object_path=DBUS_CREDENTIALS_PATH,
                         follow_name_owner_changes=True)
    proxy = dbus.Interface(object=obj,
                           dbus_interface=DBUS_CREDENTIALS_IFACE)
    proxy.connect_to_signal("CredentialsCleared", loop.quit)
    proxy.connect_to_signal("CredentialsNotFound", loop.quit)
    proxy.connect_to_signal("CredentialsError", loop.quit)
    proxy.clear_credentials(appname, {})
    # ensure we don't hang forever here
    GObject.timeout_add_seconds(2, loop.quit)
    # run the mainloop until the credentials are clear
    loop.run()
config-server.py 文件源码 项目:qOverview 作者: bharadwaj-raju 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def run(self):
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        bus_name = dbus.service.BusName("org.qoverview.config", dbus.SessionBus())
        dbus.service.Object.__init__(self, bus_name, "/org/qoverview/config")

        self._loop = GObject.MainLoop()
        self._loop.run()
ingest.py 文件源码 项目:voctomix-outcasts 作者: CarlFK 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run_pipeline(pipeline, clock, audio_delay=0, video_delay=0):

    def on_eos(bus, message):
        print('Received EOS-Signal')
        sys.exit(1)

    def on_error(bus, message):
        print('Received Error-Signal')
        (error, debug) = message.parse_error()
        print('Error-Details: #%u: %s' % (error.code, debug))
        sys.exit(2)

    print('starting pipeline...')
    senderPipeline = Gst.parse_launch(pipeline)
    senderPipeline.use_clock(clock)

    # Delay video/audio if required
    NS_TO_MS = 100000

    if video_delay > 0:
        print('Adjusting video sync: [{} milliseconds]'.format(video_delay))
        video_delay = video_delay * NS_TO_MS
        videosrc = senderPipeline.get_by_name('videosrc')
        videosrc.get_static_pad('src').set_offset(video_delay)

    if audio_delay > 0:
        print('Adjusting audio sync: [{} milliseconds]'.format(audio_delay))
        audio_delay = audio_delay * NS_TO_MS
        audiosrc = senderPipeline.get_by_name('audiosrc')
        audiosrc.get_static_pad('src').set_offset(audio_delay)

    # Binding End-of-Stream-Signal on Source-Pipeline
    senderPipeline.bus.add_signal_watch()
    senderPipeline.bus.connect("message::eos", on_eos)
    senderPipeline.bus.connect("message::error", on_error)

    print("playing...")
    senderPipeline.set_state(Gst.State.PLAYING)

    mainloop = GObject.MainLoop()
    try:
        mainloop.run()
    except KeyboardInterrupt:
        print('Terminated via Ctrl-C')

    print('Shutting down...')
    senderPipeline.set_state(Gst.State.NULL)
    print('Done.')

    return
update.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update_from_software_center_agent(db, cache, ignore_cache=False,
                                      include_sca_qa=False):
    """ update index based on the software-center-agent data """
    def _available_cb(sca, available):
        # print "available: ", available
        LOG.debug("available: '%s'" % available)
        sca.available = available
        sca.good_data = True
        loop.quit()

    def _error_cb(sca, error):
        LOG.warn("error: %s" % error)
        sca.available = []
        sca.good_data = False
        loop.quit()
    # use the anonymous interface to s-c-agent, scales much better and is
    # much cache friendlier
    from softwarecenter.backend.scagent import SoftwareCenterAgent
    # FIXME: honor ignore_etag here somehow with the new piston based API
    sca = SoftwareCenterAgent(ignore_cache)
    sca.connect("available", _available_cb)
    sca.connect("error", _error_cb)
    sca.available = None
    if include_sca_qa:
        sca.query_available_qa()
    else:
        sca.query_available()
    # create event loop and run it until data is available
    # (the _available_cb and _error_cb will quit it)
    context = GObject.main_context_default()
    loop = GObject.MainLoop(context)
    loop.run()
    # process data
    for entry in sca.available:
        # process events
        while context.pending():
            context.iteration()
        try:
            # now the normal parser
            parser = SCAApplicationParser(entry)
            index_app_info_from_parser(parser, db, cache)
        except Exception as e:
            LOG.warning("error processing: %s " % e)
    # return true if we have updated entries (this can also be an empty list)
    # but only if we did not got a error from the agent
    return sca.good_data
update.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def update_from_software_center_agent(db, cache, ignore_cache=False,
                                      include_sca_qa=False):
    """ update index based on the software-center-agent data """
    def _available_cb(sca, available):
        # print "available: ", available
        LOG.debug("available: '%s'" % available)
        sca.available = available
        sca.good_data = True
        loop.quit()

    def _error_cb(sca, error):
        LOG.warn("error: %s" % error)
        sca.available = []
        sca.good_data = False
        loop.quit()
    # use the anonymous interface to s-c-agent, scales much better and is
    # much cache friendlier
    from softwarecenter.backend.scagent import SoftwareCenterAgent
    # FIXME: honor ignore_etag here somehow with the new piston based API
    sca = SoftwareCenterAgent(ignore_cache)
    sca.connect("available", _available_cb)
    sca.connect("error", _error_cb)
    sca.available = None
    if include_sca_qa:
        sca.query_available_qa()
    else:
        sca.query_available()
    # create event loop and run it until data is available
    # (the _available_cb and _error_cb will quit it)
    context = GObject.main_context_default()
    loop = GObject.MainLoop(context)
    loop.run()
    # process data
    for entry in sca.available:
        # process events
        while context.pending():
            context.iteration()
        try:
            # now the normal parser
            parser = SCAApplicationParser(entry)
            index_app_info_from_parser(parser, db, cache)
        except Exception as e:
            LOG.warning("error processing: %s " % e)
    # return true if we have updated entries (this can also be an empty list)
    # but only if we did not got a error from the agent
    return sca.good_data
pidgin.py 文件源码 项目:tfc 作者: maqp 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def im_incoming(queues: Dict[bytes, 'Queue']) -> None:
    """Loop that maintains signal receiver process."""

    def pidgin_to_rxm(account: str, sender: str, message: str, *_: Any) -> None:
        """Signal receiver process that receives packets from Pidgin."""
        sender = sender.split('/')[0]
        ts     = datetime.now().strftime("%m-%d / %H:%M:%S")
        d_bus  = dbus.SessionBus(private=True)
        obj    = d_bus.get_object("im.pidgin.purple.PurpleService", "/im/pidgin/purple/PurpleObject")
        purple = dbus.Interface(obj, "im.pidgin.purple.PurpleInterface")

        user = ''
        for a in purple.PurpleAccountsGetAllActive():
            if a == account:
                user = purple.PurpleAccountGetUsername(a)[:-1]

        if not message.startswith(TFC):
            return None

        try:
            __, header, payload = message.split('|')  # type: Tuple[str, str, str]
        except ValueError:
            return None

        if header.encode() == PUBLIC_KEY_PACKET_HEADER:
            print("{} - pub key {} > {} > RxM".format(ts, sender, user))

        elif header.encode() == MESSAGE_PACKET_HEADER:
            print("{} - message {} > {} > RxM".format(ts, sender, user))

        else:
            print("Received invalid packet from {}".format(sender))
            return None

        decoded = base64.b64decode(payload)
        packet  = header.encode() + decoded + ORIGIN_CONTACT_HEADER + sender.encode()
        queues[RXM_OUTGOING_QUEUE].put(packet)

    while True:
        with ignored(dbus.exceptions.DBusException, EOFError, KeyboardInterrupt):
            bus = dbus.SessionBus(private=True, mainloop=DBusGMainLoop())
            bus.add_signal_receiver(pidgin_to_rxm, dbus_interface="im.pidgin.purple.PurpleInterface", signal_name="ReceivedImMsg")
            GObject.MainLoop().run()


问题


面经


文章

微信
公众号

扫码关注公众号