def clear_token_from_ubuntu_sso_sync(appname):
""" send a dbus signal to the com.ubuntu.sso service to clear
the credentials for the given appname, e.g. _("Ubuntu Software Center")
and wait for it to finish (or 2s)
"""
from ubuntu_sso import (
DBUS_BUS_NAME,
DBUS_CREDENTIALS_IFACE,
DBUS_CREDENTIALS_PATH,
)
# clean
loop = GObject.MainLoop()
bus = dbus.SessionBus()
obj = bus.get_object(bus_name=DBUS_BUS_NAME,
object_path=DBUS_CREDENTIALS_PATH,
follow_name_owner_changes=True)
proxy = dbus.Interface(object=obj,
dbus_interface=DBUS_CREDENTIALS_IFACE)
proxy.connect_to_signal("CredentialsCleared", loop.quit)
proxy.connect_to_signal("CredentialsNotFound", loop.quit)
proxy.connect_to_signal("CredentialsError", loop.quit)
proxy.clear_credentials(appname, {})
# ensure we don't hang forever here
GObject.timeout_add_seconds(2, loop.quit)
# run the mainloop until the credentials are clear
loop.run()
python类MainLoop()的实例源码
def run(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus_name = dbus.service.BusName("org.qoverview.config", dbus.SessionBus())
dbus.service.Object.__init__(self, bus_name, "/org/qoverview/config")
self._loop = GObject.MainLoop()
self._loop.run()
def run_pipeline(pipeline, clock, audio_delay=0, video_delay=0):
def on_eos(bus, message):
print('Received EOS-Signal')
sys.exit(1)
def on_error(bus, message):
print('Received Error-Signal')
(error, debug) = message.parse_error()
print('Error-Details: #%u: %s' % (error.code, debug))
sys.exit(2)
print('starting pipeline...')
senderPipeline = Gst.parse_launch(pipeline)
senderPipeline.use_clock(clock)
# Delay video/audio if required
NS_TO_MS = 100000
if video_delay > 0:
print('Adjusting video sync: [{} milliseconds]'.format(video_delay))
video_delay = video_delay * NS_TO_MS
videosrc = senderPipeline.get_by_name('videosrc')
videosrc.get_static_pad('src').set_offset(video_delay)
if audio_delay > 0:
print('Adjusting audio sync: [{} milliseconds]'.format(audio_delay))
audio_delay = audio_delay * NS_TO_MS
audiosrc = senderPipeline.get_by_name('audiosrc')
audiosrc.get_static_pad('src').set_offset(audio_delay)
# Binding End-of-Stream-Signal on Source-Pipeline
senderPipeline.bus.add_signal_watch()
senderPipeline.bus.connect("message::eos", on_eos)
senderPipeline.bus.connect("message::error", on_error)
print("playing...")
senderPipeline.set_state(Gst.State.PLAYING)
mainloop = GObject.MainLoop()
try:
mainloop.run()
except KeyboardInterrupt:
print('Terminated via Ctrl-C')
print('Shutting down...')
senderPipeline.set_state(Gst.State.NULL)
print('Done.')
return
def update_from_software_center_agent(db, cache, ignore_cache=False,
include_sca_qa=False):
""" update index based on the software-center-agent data """
def _available_cb(sca, available):
# print "available: ", available
LOG.debug("available: '%s'" % available)
sca.available = available
sca.good_data = True
loop.quit()
def _error_cb(sca, error):
LOG.warn("error: %s" % error)
sca.available = []
sca.good_data = False
loop.quit()
# use the anonymous interface to s-c-agent, scales much better and is
# much cache friendlier
from softwarecenter.backend.scagent import SoftwareCenterAgent
# FIXME: honor ignore_etag here somehow with the new piston based API
sca = SoftwareCenterAgent(ignore_cache)
sca.connect("available", _available_cb)
sca.connect("error", _error_cb)
sca.available = None
if include_sca_qa:
sca.query_available_qa()
else:
sca.query_available()
# create event loop and run it until data is available
# (the _available_cb and _error_cb will quit it)
context = GObject.main_context_default()
loop = GObject.MainLoop(context)
loop.run()
# process data
for entry in sca.available:
# process events
while context.pending():
context.iteration()
try:
# now the normal parser
parser = SCAApplicationParser(entry)
index_app_info_from_parser(parser, db, cache)
except Exception as e:
LOG.warning("error processing: %s " % e)
# return true if we have updated entries (this can also be an empty list)
# but only if we did not got a error from the agent
return sca.good_data
def update_from_software_center_agent(db, cache, ignore_cache=False,
include_sca_qa=False):
""" update index based on the software-center-agent data """
def _available_cb(sca, available):
# print "available: ", available
LOG.debug("available: '%s'" % available)
sca.available = available
sca.good_data = True
loop.quit()
def _error_cb(sca, error):
LOG.warn("error: %s" % error)
sca.available = []
sca.good_data = False
loop.quit()
# use the anonymous interface to s-c-agent, scales much better and is
# much cache friendlier
from softwarecenter.backend.scagent import SoftwareCenterAgent
# FIXME: honor ignore_etag here somehow with the new piston based API
sca = SoftwareCenterAgent(ignore_cache)
sca.connect("available", _available_cb)
sca.connect("error", _error_cb)
sca.available = None
if include_sca_qa:
sca.query_available_qa()
else:
sca.query_available()
# create event loop and run it until data is available
# (the _available_cb and _error_cb will quit it)
context = GObject.main_context_default()
loop = GObject.MainLoop(context)
loop.run()
# process data
for entry in sca.available:
# process events
while context.pending():
context.iteration()
try:
# now the normal parser
parser = SCAApplicationParser(entry)
index_app_info_from_parser(parser, db, cache)
except Exception as e:
LOG.warning("error processing: %s " % e)
# return true if we have updated entries (this can also be an empty list)
# but only if we did not got a error from the agent
return sca.good_data
def im_incoming(queues: Dict[bytes, 'Queue']) -> None:
"""Loop that maintains signal receiver process."""
def pidgin_to_rxm(account: str, sender: str, message: str, *_: Any) -> None:
"""Signal receiver process that receives packets from Pidgin."""
sender = sender.split('/')[0]
ts = datetime.now().strftime("%m-%d / %H:%M:%S")
d_bus = dbus.SessionBus(private=True)
obj = d_bus.get_object("im.pidgin.purple.PurpleService", "/im/pidgin/purple/PurpleObject")
purple = dbus.Interface(obj, "im.pidgin.purple.PurpleInterface")
user = ''
for a in purple.PurpleAccountsGetAllActive():
if a == account:
user = purple.PurpleAccountGetUsername(a)[:-1]
if not message.startswith(TFC):
return None
try:
__, header, payload = message.split('|') # type: Tuple[str, str, str]
except ValueError:
return None
if header.encode() == PUBLIC_KEY_PACKET_HEADER:
print("{} - pub key {} > {} > RxM".format(ts, sender, user))
elif header.encode() == MESSAGE_PACKET_HEADER:
print("{} - message {} > {} > RxM".format(ts, sender, user))
else:
print("Received invalid packet from {}".format(sender))
return None
decoded = base64.b64decode(payload)
packet = header.encode() + decoded + ORIGIN_CONTACT_HEADER + sender.encode()
queues[RXM_OUTGOING_QUEUE].put(packet)
while True:
with ignored(dbus.exceptions.DBusException, EOFError, KeyboardInterrupt):
bus = dbus.SessionBus(private=True, mainloop=DBusGMainLoop())
bus.add_signal_receiver(pidgin_to_rxm, dbus_interface="im.pidgin.purple.PurpleInterface", signal_name="ReceivedImMsg")
GObject.MainLoop().run()