def setup_signals(self, signals, handler):
"""
This is a workaround to signal.signal(signal, handler)
which does not work with a GLib.MainLoop() for some reason.
Thanks to: http://stackoverflow.com/a/26457317/5433146
args:
signals (list of signal.SIG... signals): the signals to connect to
handler (function): function to be executed on these signals
"""
def install_glib_handler(sig): # add a unix signal handler
GLib.unix_signal_add( GLib.PRIORITY_HIGH,
sig, # for the given signal
handler, # on this signal, run this function
sig # with this argument
)
for sig in signals: # loop over all signals
GLib.idle_add( # 'execute'
install_glib_handler, sig, # add a handler for this signal
priority = GLib.PRIORITY_HIGH )
# set the config
python类MainLoop()的实例源码
def setup_signals(self, signals, handler):
"""
This is a workaround to signal.signal(signal, handler)
which does not work with a GLib.MainLoop() for some reason.
Thanks to: http://stackoverflow.com/a/26457317/5433146
args:
signals (list of signal.SIG... signals): the signals to connect to
handler (function): function to be executed on these signals
"""
def install_glib_handler(sig): # add a unix signal handler
GLib.unix_signal_add( GLib.PRIORITY_HIGH,
sig, # for the given signal
handler, # on this signal, run this function
sig # with this argument
)
for sig in signals: # loop over all signals
GLib.idle_add( # 'execute'
install_glib_handler, sig, # add a handler for this signal
priority = GLib.PRIORITY_HIGH )
# build the gui
def __init__(self):
""" class constructor
"""
# initially set an empty configuration
self.set_config(configparser.ConfigParser())
# set up the quit signals
self.setup_signals(
signals = [signal.SIGINT, signal.SIGTERM, signal.SIGHUP],
handler = self.quit
)
# can't use Gtk.main() because of a bug that prevents proper SIGINT
# handling. use Glib.MainLoop() directly instead.
self.mainloop = GLib.MainLoop() # main loop
##################
### Properties ###
##################
def setup_signals(self, signals, handler):
"""
This is a workaround to signal.signal(signal, handler)
which does not work with a GLib.MainLoop() for some reason.
Thanks to: http://stackoverflow.com/a/26457317/5433146
args:
signals (list of signal.SIG... signals): the signals to connect to
handler (function): function to be executed on these signals
"""
def install_glib_handler(sig): # add a unix signal handler
GLib.unix_signal_add( GLib.PRIORITY_HIGH,
sig, # for the given signal
handler, # on this signal, run this function
sig # with this argument
)
for sig in signals: # loop over all signals
GLib.idle_add( # 'execute'
install_glib_handler, sig, # add a handler for this signal
priority = GLib.PRIORITY_HIGH )
# get an object from the builder
def initialize(self, environment):
self.env = environment
global _gstreamerAvailable
self._initialized = _gstreamerAvailable
if not self._initialized:
global _availableError
self.environment['runtime']['debug'].writeDebugOut('Gstreamer not available ' + _availableError,debug.debugLevel.ERROR)
return
self._player = Gst.ElementFactory.make('playbin', 'player')
bus = self._player.get_bus()
bus.add_signal_watch()
bus.connect("message", self._onPlayerMessage)
self._pipeline = Gst.Pipeline(name='fenrir-pipeline')
bus = self._pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self._onPipelineMessage)
self._source = Gst.ElementFactory.make('audiotestsrc', 'src')
self._sink = Gst.ElementFactory.make('autoaudiosink', 'output')
self._pipeline.add(self._source)
self._pipeline.add(self._sink)
self._source.link(self._sink)
self.mainloop = GLib.MainLoop()
self.thread = threading.Thread(target=self.mainloop.run)
self.thread.start()
def __init__(self, adapter_addr, device_addr):
"""Default initialiser.
Creates object for the specified remote Bluetooth device.
This is on the specified adapter specified.
:param adapter_addr: Address of the local Bluetooth adapter.
:param device_addr: Address of the remote Bluetooth device.
"""
self.bus = dbus.SystemBus()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.mainloop = GObject.MainLoop()
device_path = dbus_tools.get_dbus_path(adapter_addr, device_addr)
self.remote_device_path = device_path
self.remote_device_obj = self.bus.get_object(
constants.BLUEZ_SERVICE_NAME,
self.remote_device_path)
self.remote_device_methods = dbus.Interface(
self.remote_device_obj,
constants.DEVICE_INTERFACE)
self.remote_device_props = dbus.Interface(self.remote_device_obj,
dbus.PROPERTIES_IFACE)
def run_producers_test(producers, names, args):
loop = GLib.MainLoop()
[producer.start() for producer in producers]
producer.start()
if args.output:
from ..file import FileConsumer
consumers = [FileConsumer(n, filename="%s-%s"%(args.output, o)) for o,n in enumerate(names)]
def check_complete(*a):
if all([c._emitted_complete for c in consumers]):
print("ALL RETRIEVED")
loop.quit()
[consumer.connect('complete', check_complete) for consumer in consumers]
[consumer.start() for consumer in consumers]
loop.run()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("filename")
parser.add_argument("-l", "--limit", type=int, default=0)
args = utils.parse_args(parser=parser)
consumer = FileConsumer(args.name, args.filename)
def check(consumer, pct):
if args.limit and consumer._callbackCount > args.limit:
complete()
consumer.connect('progress', check)
def complete(*a):
loop.quit()
consumer.connect('complete', complete)
consumer.start()
loop = GLib.MainLoop()
loop.run()
def main():
utils.parse_args(include_name=False)
loop = GLib.MainLoop()
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, signal_cb)
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGTERM, signal_cb)
monitor = Gio.VolumeMonitor.get()
store = simple_store.Producer(prefix=SUBSCRIPTIONS_SOMA,
base=defaults.ENDLESS_NDN_CACHE_PATH,
cost=defaults.RouteCost.USB)
store.start()
for mount in monitor.get_mounts():
mount_added_cb(monitor, mount, store)
monitor.connect("mount-added", mount_added_cb, store)
monitor.connect("mount-removed", mount_removed_cb, store)
maybe_time_out()
loop.run()
def _update(self, keys, raw):
"""
>>> w = _refl("widget")
>>> w.selectable_rval = True
>>> w.mouse_event_rval = True
>>> scr = _refl("screen")
>>> scr.get_cols_rows_rval = (15, 5)
>>> evl = _refl("event_loop")
>>> ml = MainLoop(w, [], scr, event_loop=evl)
>>> ml._input_timeout = "old timeout"
>>> ml._update(['y'], [121]) # doctest:+ELLIPSIS
screen.get_cols_rows()
widget.selectable()
widget.keypress((15, 5), 'y')
>>> ml._update([("mouse press", 1, 5, 4)], [])
widget.mouse_event((15, 5), 'mouse press', 1, 5, 4, focus=True)
>>> ml._update([], [])
"""
keys = self.input_filter(keys, raw)
if keys:
self.process_input(keys)
if 'window resize' in keys:
self.screen_size = None
def _test_run_screen_event_loop(self):
"""
>>> w = _refl("widget")
>>> scr = _refl("screen")
>>> scr.get_cols_rows_rval = (10, 5)
>>> scr.get_input_rval = [], []
>>> ml = MainLoop(w, screen=scr)
>>> def stop_now(loop, data):
... raise ExitMainLoop()
>>> handle = ml.set_alarm_in(0, stop_now)
>>> try:
... ml._run_screen_event_loop()
... except ExitMainLoop:
... pass
screen.get_cols_rows()
widget.render((10, 5), focus=True)
screen.draw_screen((10, 5), None)
screen.set_input_timeouts(0)
screen.get_input(True)
"""
def run(self):
self.mainloop = GLib.MainLoop()
self.show()
self.mainloop.run()
def __init__(self):
self.exit_code = -1
self.mainloop = GLib.MainLoop()
self.config = None
# hash_of_colors is used to determine if css needs to be reapplied
# after configuration change
self._hash_of_colors = -1
self._window = None
self._registered = False
self._last_profile_change = 0
self._recent_profiles_undo = None
def __init__(self):
# initially set the standard logger
self.set_logger(logging.getLogger(__name__))
# initially set an empty configuration
self.set_config(configparser.ConfigParser())
# set up the quit signals
self.setup_signals(
signals = [signal.SIGINT, signal.SIGTERM, signal.SIGHUP],
handler = self.please_stop_now
)
# use glib as default mailoop for dbus
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
dbus.mainloop.glib.threads_init()
GLib.threads_init()
self.loop = GLib.MainLoop() # create mainloop
self.systembus = dbus.SystemBus() # the system bus
self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name
bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name
# register the object on the bus name
dbus.service.Object.__init__(self, bus_name, CO2MONITOR_OBJECTPATH)
def run(self):
# can't use Gtk.main() because of a bug that prevents proper SIGINT
# handling. use Glib.MainLoop() directly instead.
self.mainloop = GLib.MainLoop() # main loop
# signal.signal(signal.SIGINT, signal.SIG_DFL)
self.logger.debug(_("Starting GLib main loop..."))
self.mainloop.run()
self.logger.debug(_("GLib main loop ended."))
# quit the gui
def run(self):
logging.info('D-Bus process started')
GLib.threads_init() # allow threads in GLib
GLib.idle_add(self._idleQueueSync)
DBusGMainLoop(set_as_default=True)
dbusService = SessionDBus(self.taskQueue, self.resultQueue)
try:
GLib.MainLoop().run()
except KeyboardInterrupt:
logging.debug("\nThe MainLoop will close...")
GLib.MainLoop().quit()
return
def start(self):
shared.main_loop = GLib.MainLoop()
shared.main_loop.run()
def start(self):
shared.main_loop = GLib.MainLoop()
shared.main_loop.run()
def run(self):
"""
Run the Sniffer main loop.
"""
if self.adapter is not None:
self._log.debug("Clearing the BlueZ device registry.")
for path, _ in get_known_devices():
self.adapter.RemoveDevice(path)
self._log.debug("Registering the signals InterfacesAdded and PropertiesChanged.")
bus = pydbus.SystemBus()
bus.subscribe(
sender=SERVICE_NAME,
iface=OBJECT_MANAGER_INTERFACE,
signal="InterfacesAdded",
signal_fired=self._cb_interfaces_added
)
bus.subscribe(
sender=SERVICE_NAME,
iface=OBJECT_MANAGER_INTERFACE,
signal="InterfacesRemoved",
signal_fired=self._cb_interfaces_removed
)
bus.subscribe(
sender=SERVICE_NAME,
iface=PROPERTIES_INTERFACE,
signal="PropertiesChanged",
arg0=DEVICE_INTERFACE,
signal_fired=self._cb_properties_changed
)
self._log.debug("Running the main loop.")
if self.output_path is not None and self.backup_interval > 0:
GLib.timeout_add_seconds(self.backup_interval, self._cb_backup_registry)
if self.attempt_connection:
GLib.timeout_add_seconds(self.queueing_interval, self._cb_connect_check)
loop = GLib.MainLoop()
loop.run()
else:
raise ValueError("Sniffer.run can only be called in a context "
"(e.g. `with Sniffer(...) as s: s.run()`)")
def run_dbus_service(self, timeout=None, send_usr1=False):
'''Run D-BUS server.
If no timeout is given, the server will run forever, otherwise it will
return after the specified number of seconds.
If send_usr1 is True, this will send a SIGUSR1 to the parent process
once the server is ready to take requests.
'''
dbus.service.Object.__init__(self, self.bus, '/RecoveryMedia')
self.main_loop = GLib.MainLoop()
self._timeout = False
if timeout:
def _quit():
"""This function is ran at the end of timeout"""
self.main_loop.quit()
return True
GLib.timeout_add(timeout * 1000, _quit)
# send parent process a signal that we are ready now
if send_usr1:
os.kill(os.getppid(), signal.SIGUSR1)
# run until we time out
while not self._timeout:
if timeout:
self._timeout = True
self.main_loop.run()
def dbus_sync_call_signal_wrapper(dbus_iface, func, handler_map, *args, **kwargs):
'''Run a D-BUS method call while receiving signals.
This function is an Ugly Hack™, since a normal synchronous dbus_iface.fn()
call does not cause signals to be received until the method returns. Thus
it calls func asynchronously and sets up a temporary main loop to receive
signals and call their handlers; these are assigned in handler_map (signal
name ? signal handler).
'''
if not hasattr(dbus_iface, 'connect_to_signal'):
# not a D-BUS object
return getattr(dbus_iface, func)(*args, **kwargs)
def _h_reply(*args, **kwargs):
"""protected method to send a reply"""
global _h_reply_result
_h_reply_result = args
loop.quit()
def _h_error(exception=None):
"""protected method to send an error"""
global _h_exception_exc
_h_exception_exc = exception
loop.quit()
loop = GLib.MainLoop()
global _h_reply_result, _h_exception_exc
_h_reply_result = None
_h_exception_exc = None
kwargs['reply_handler'] = _h_reply
kwargs['error_handler'] = _h_error
kwargs['timeout'] = 86400
for signame, sighandler in handler_map.items():
dbus_iface.connect_to_signal(signame, sighandler)
dbus_iface.get_dbus_method(func)(*args, **kwargs)
loop.run()
if _h_exception_exc:
raise _h_exception_exc
return _h_reply_result
def __init__(self):
self.mainloop = GLib.MainLoop()
def run_producer_test(producer, name, args):
loop = GLib.MainLoop()
producer.start()
if args.output:
from ..file import FileConsumer
consumer = FileConsumer(name, filename=args.output)
consumer.connect('complete', lambda *a: loop.quit())
consumer.start()
loop.run()
def main():
utils.parse_args(include_name=False)
Notify.init("Content Updates")
service = DBusService()
GLib.MainLoop().run()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("filename")
args = utils.parse_args(parser=parser)
f = open(args.filename, 'rb')
producer = FileProducer(args.name, f)
producer.start()
loop = GLib.MainLoop()
loop.run()
def main():
from gi.repository import GLib
utils.parse_args(include_name=False)
monitor = AvahiMonitor()
loop = GLib.MainLoop()
loop.run()
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--store-dir", required=True)
args = utils.parse_args(parser=parser, include_name=False)
subscription_producer = subscription.Producer(args.store_dir)
subscription_producer.start()
store = simple_store.Producer(
base=args.store_dir, prefix=names.SUBSCRIPTIONS_SOMA)
store.start()
GLib.MainLoop().run()
def main():
utils.parse_args(include_name=False)
fetcher = Fetcher()
loop = GLib.MainLoop()
loop.run()
def __init__ (self):
# self.event = event
self.ev = Event( )
self.loop = GLib.MainLoop( )
self.expired = False
def _test_run(self):
"""
>>> w = _refl("widget") # _refl prints out function calls
>>> w.render_rval = "fake canvas" # *_rval is used for return values
>>> scr = _refl("screen")
>>> scr.get_input_descriptors_rval = [42]
>>> scr.get_cols_rows_rval = (20, 10)
>>> scr.started = True
>>> scr._urwid_signals = {}
>>> evl = _refl("event_loop")
>>> evl.enter_idle_rval = 1
>>> evl.watch_file_rval = 2
>>> ml = MainLoop(w, [], scr, event_loop=evl)
>>> ml.run() # doctest:+ELLIPSIS
screen.start()
screen.set_mouse_tracking()
screen.unhook_event_loop(...)
screen.hook_event_loop(...)
event_loop.enter_idle(<bound method MainLoop.entering_idle...>)
event_loop.run()
event_loop.remove_enter_idle(1)
screen.unhook_event_loop(...)
screen.stop()
>>> ml.draw_screen() # doctest:+ELLIPSIS
screen.get_cols_rows()
widget.render((20, 10), focus=True)
screen.draw_screen((20, 10), 'fake canvas')
"""