def __init__(self, device_id=None):
"""Default initialiser.
1. Initialises the program loop using ``GObject``.
2. Registers the Application on the D-Bus.
3. Initialises the list of services offered by the application.
"""
# Initialise the loop that the application runs in
GObject.threads_init()
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.mainloop = GObject.MainLoop()
# Initialise the D-Bus path and register it
self.bus = dbus.SystemBus()
self.path = '/ukBaz/bluezero/application{}'.format(id(self))
self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
dbus.service.Object.__init__(self, self.bus_name, self.path)
# Initialise services within the application
self.services = []
self.dongle = adapter.Adapter(device_id)
python类MainLoop()的实例源码
def __init__(self, adapter_addr, device_addr):
"""Default initialiser.
Creates object for the specified remote Bluetooth device.
This is on the specified adapter specified.
:param adapter_addr: Address of the local Bluetooth adapter.
:param device_addr: Address of the remote Bluetooth device.
"""
self.bus = dbus.SystemBus()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.mainloop = GObject.MainLoop()
device_path = dbus_tools.get_dbus_path(adapter_addr, device_addr)
self.remote_device_path = device_path
self.remote_device_obj = self.bus.get_object(
constants.BLUEZ_SERVICE_NAME,
self.remote_device_path)
self.remote_device_methods = dbus.Interface(
self.remote_device_obj,
constants.DEVICE_INTERFACE)
self.remote_device_props = dbus.Interface(self.remote_device_obj,
dbus.PROPERTIES_IFACE)
def main():
global bluetooth
global ibus
bluetooth = bt_.BluetoothService(onBluetoothConnected, onPlayerChanged)
ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket)
ibus.cmd = ibus_.IBUSCommands(ibus)
ibus.main_thread = threading.Thread(target=ibus.start)
ibus.main_thread.daemon = True
ibus.main_thread.start()
try:
mainloop = GObject.MainLoop()
mainloop.run()
except KeyboardInterrupt:
pass
except:
print("Unable to run the gobject main loop")
print("")
shutdown()
sys.exit(0)
def main():
global ibus
ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket)
ibus.cmd = ibus_.IBUSCommands(ibus)
ibus.main_thread = threading.Thread(target=ibus.start)
ibus.main_thread.daemon = True
ibus.main_thread.start()
try:
mainloop = GObject.MainLoop()
mainloop.run()
except KeyboardInterrupt:
pass
except:
print("Unable to run the gobject main loop")
print("")
shutdown()
sys.exit(0)
def run(self):
"""
Registers advertisement and services to D-Bus and starts the main loop.
"""
if self._main_loop:
return
self._main_loop = GObject.MainLoop()
self._disconnect_all()
self._register()
logger.info("--- Mainloop started ---")
try:
self._main_loop.run()
except KeyboardInterrupt:
# ignore exception as it is a valid way to exit the program
# and skip to finally clause
pass
except Exception as e:
logger.error(e)
finally:
logger.info("--- Mainloop finished ---")
self._unregister()
self._main_loop.quit()
self._main_loop = None
def __init__(self, useGtk=True):
self.context = gobject.main_context_default()
self.loop = gobject.MainLoop()
posixbase.PosixReactorBase.__init__(self)
# pre 2.3.91 the glib iteration and mainloop functions didn't release
# global interpreter lock, thus breaking thread and signal support.
if (hasattr(gobject, "pygtk_version") and gobject.pygtk_version >= (2, 3, 91)
and not useGtk):
self.__pending = self.context.pending
self.__iteration = self.context.iteration
self.__crash = self.loop.quit
self.__run = self.loop.run
else:
import gtk
self.__pending = gtk.events_pending
self.__iteration = gtk.main_iteration
self.__crash = _our_mainquit
self.__run = gtk.main
# The input_add function in pygtk1 checks for objects with a
# 'fileno' method and, if present, uses the result of that method
# as the input source. The pygtk2 input_add does not do this. The
# function below replicates the pygtk1 functionality.
# In addition, pygtk maps gtk.input_add to _gobject.io_add_watch, and
# g_io_add_watch() takes different condition bitfields than
# gtk_input_add(). We use g_io_add_watch() here in case pygtk fixes this
# bug.
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--adapter-name', type=str, help='Adapter name', default='')
args = parser.parse_args()
adapter_name = args.adapter_name
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
mainloop = GObject.MainLoop()
advertising.advertising_main(mainloop, bus, adapter_name)
gatt_server.gatt_server_main(mainloop, bus, adapter_name)
mainloop.run()
def __init__(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.mainloop = gobject.MainLoop()
bus = dbus.SystemBus()
proxy = bus.get_object("org.bluez", "/")
manager = dbus.Interface(proxy, "org.bluez.Manager")
adapter_path = manager.DefaultAdapter()
proxy = bus.get_object("org.bluez", adapter_path)
self.adapter = dbus.Interface(proxy, "org.bluez.Adapter")
self.oob_adapter = dbus.Interface(proxy, "org.bluez.OutOfBand")
def __init__(self):
self.mainloop = gobject.MainLoop()
bus = dbus.SystemBus()
proxy = bus.get_object("org.bluez", "/")
manager = dbus.Interface(proxy, "org.bluez.Manager")
adapter_path = manager.DefaultAdapter()
proxy = bus.get_object("org.bluez", adapter_path)
self.adapter = dbus.Interface(proxy, "org.bluez.Adapter")
self.oob_adapter = dbus.Interface(proxy, "org.bluez.OutOfBand")
def run(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus_name = dbus.service.BusName("com.prozacville.steam_monitor", dbus.SessionBus())
dbus.service.Object.__init__(self, bus_name, "/com/prozacville/steam_monitor")
self._loop = gobject.MainLoop()
print "Service running..."
self._loop.run()
print "Service stopped"
def __init__(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.bus = dbus.SystemBus()
self._mainloop = GObject.MainLoop()
self.wifi_manager = WiFiControl()
self.callbacks = {}
self.current_state = self.OFF_STATE
self.current_ssid = None
self.reconnect_worker = DaemonTreeObj(WORKER_NAME)
def __init__(self, useGtk=True):
self.context = gobject.main_context_default()
self.loop = gobject.MainLoop()
posixbase.PosixReactorBase.__init__(self)
# pre 2.3.91 the glib iteration and mainloop functions didn't release
# global interpreter lock, thus breaking thread and signal support.
if (hasattr(gobject, "pygtk_version") and gobject.pygtk_version >= (2, 3, 91)
and not useGtk):
self.__pending = self.context.pending
self.__iteration = self.context.iteration
self.__crash = self.loop.quit
self.__run = self.loop.run
else:
import gtk
self.__pending = gtk.events_pending
self.__iteration = gtk.main_iteration
self.__crash = _our_mainquit
self.__run = gtk.main
# The input_add function in pygtk1 checks for objects with a
# 'fileno' method and, if present, uses the result of that method
# as the input source. The pygtk2 input_add does not do this. The
# function below replicates the pygtk1 functionality.
# In addition, pygtk maps gtk.input_add to _gobject.io_add_watch, and
# g_io_add_watch() takes different condition bitfields than
# gtk_input_add(). We use g_io_add_watch() here in case pygtk fixes this
# bug.
def __init__(
self, client_class, timeout=180, capability="KeyboardDisplay",
path="/org/bluez/my_bluetooth_agent"):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.client_class = client_class
self.timeout = timeout
self.capability = capability
self.path = path
self._bus = dbus.SystemBus()
self._mainloop = GObject.MainLoop()
_bluetooth.make_discoverable(False)
def __init__(self, tcp_port_in=8043, tcp_port_out=None, channel=1):
self._spp = SerialPort(channel)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
dbus.service.Object.__init__(
self, dbus.SystemBus(), self._spp.profile_path)
self.tcp_port_in = tcp_port_in
self.tcp_port_out = tcp_port_out
self._mainloop = GObject.MainLoop()
def main():
global mainloop
global display
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
# Get ServiceManager and AdvertisingManager
service_manager = get_service_manager(bus)
ad_manager = get_ad_manager(bus)
# Create gatt services
display = setup_display()
app = LedApplication(bus, display)
# Create advertisement
test_advertisement = LedAdvertisement(bus, 0)
mainloop = GObject.MainLoop()
# Register gatt services
service_manager.RegisterApplication(app.get_path(), {},
reply_handler=register_app_cb,
error_handler=register_app_error_cb)
# Register advertisement
ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)
try:
mainloop.run()
except KeyboardInterrupt:
display.clear()
display.write_display()
def bluetoothConnection():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
obj = bus.get_object(BUS_NAME, "/org/bluez");
profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
profile_path = "/foo/bar/profile"
auto_connect = {"AutoConnect": False}
profile_uuid = "1101"
profile = Profile(bus, profile_path)
profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
mainloop = GObject.MainLoop()
mainloop.run()
def bluetoothConnection():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
obj = bus.get_object(BUS_NAME, "/org/bluez")
profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
profile_path = "/foo/bar/profile"
auto_connect = {"AutoConnect": False}
profile_uuid = "1101"
profile = Profile(bus, profile_path)
profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
mainloop = GObject.MainLoop()
try:
mainloop.run()
except (KeyboardInterrupt, SystemExit):
closeThreads()
def bluetoothConnection():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
obj = bus.get_object(BUS_NAME, "/org/bluez");
profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
profile_path = "/foo/bar/profile"
auto_connect = {"AutoConnect": False}
profile_uuid = "1101"
profile = Profile(bus, profile_path)
profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
mainloop = GObject.MainLoop()
mainloop.run()
def _listen_for_wifi_state_changes(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
def print_status(status, nm_state):
if status in (WifiConnectionState.CONNECTING, WifiConnectionState.CONNECTED):
logger.info("Wifi status changed: %s (%d) to %s" % (status, nm_state, self._current_ssid))
else:
logger.info("Wifi status changed: %s (%d)" % (status, nm_state))
def on_state_changed(nm_instance, nm_state, **kwargs):
if nm_state >= NetworkManager.NM_STATE_CONNECTED_GLOBAL:
new_status = WifiConnectionState.CONNECTED
elif nm_state > NetworkManager.NM_STATE_DISCONNECTING:
new_status = WifiConnectionState.CONNECTING
else:
new_status = WifiConnectionState.DISCONNECTED
self._update_current_ssid()
if new_status == self._wifi_status:
return
print_status(new_status, nm_state)
self._wifi_status = new_status
self._on_wifi_status_changed()
# check initial status:
initial_state = NetworkManager.NetworkManager.State
on_state_changed(None, initial_state)
# listen for changes:
NetworkManager.NetworkManager.OnStateChanged(on_state_changed)
logger.debug("Start listening to network status changes")
# Attention: a GObject.MainLoop() is required for this to work
# in this case it is started by the BLE Peripheral object
def run(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
# check initial status:
state = NetworkManager.NetworkManager.State
self._on_state_changed(None, state)
# listen for changes:
NetworkManager.NetworkManager.OnStateChanged(self._on_state_changed)
logger.debug("Start listening to network status changes")
loop = GObject.MainLoop()
loop.run()
def run(cls, store, master_options):
gobject.threads_init()
dbus.glib.init_threads()
DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
name = dbus.service.BusName("com.airbus.rebus.bus", bus)
svc = cls(bus, "/bus", store)
svc.mainloop = gobject.MainLoop()
log.info("Entering main loop.")
try:
svc.mainloop.run()
except (KeyboardInterrupt, SystemExit):
if len(svc.clients) > 0:
log.info("Trying to stop all agents properly. Press Ctrl-C "
"again to stop.")
# stop scheduler
svc.sched.shutdown()
# ask slave agents to shutdown nicely & save internal state
log.info("Expecting %u more agents to exit (ex. %s)",
len(svc.clients), svc.clients.keys()[0])
svc.bus_exit(store.STORES_INTSTATE)
store.store_state()
try:
svc.mainloop.run()
except (KeyboardInterrupt, SystemExit):
if len(svc.clients) > 0:
log.info(
"Not all agents have stopped, exiting nonetheless")
log.info("Stopping storage...")
store.store_state()
def run_agents(self):
self.agent.run_and_catch_exc()
if self.agent.__class__.run != Agent.run:
# the run() method has been overridden - agent will run on his own
# then quit
self.iface.unregister(self.agent_id)
return
log.info("Entering agent loop")
self.loop = gobject.MainLoop()
try:
self.loop.run()
except (KeyboardInterrupt, SystemExit):
for args in self.agent.held_locks:
self.agent.unlock(*args)
self.loop.quit()
# Clean up signals - useful for tests, where one process runs several
# agents successively
self.bus.remove_signal_receiver(self.broadcast_wrapper,
dbus_interface="com.airbus.rebus.bus",
signal_name="new_descriptor")
self.bus.remove_signal_receiver(self.targeted_wrapper,
dbus_interface="com.airbus.rebus.bus",
signal_name="targeted_descriptor")
self.bus.remove_signal_receiver(self.bus_exit_handler,
dbus_interface="com.airbus.rebus.bus",
signal_name="bus_exit")
self.iface.unregister(self.agent_id)
self.agent.save_internal_state()
# DBus specific functions
def main_loop(self):
return gobject.MainLoop()
def run(self):
"""
Starts phony service which manages device pairing and setting
up of hands-free profile services. This function never returns.
"""
bus = phony.base.ipc.BusProvider()
# Find the first audio card that provides
# audio input and output mixers.
audio_card_index = -1
with phony.bluetooth.adapters.Bluez5(bus) as adapter, \
phony.bluetooth.profiles.handsfree.Ofono(bus) as hfp, \
phony.audio.alsa.Alsa(card_index=audio_card_index) as audio, \
phony.headset.HandsFreeHeadset(bus, adapter, hfp, audio) as hs:
# Register to receive some bluetooth events
hs.on_device_connected(self.device_connected)
hs.on_incoming_call(self.incoming_call)
hs.on_call_began(self.call_began)
hs.on_call_ended(self.call_ended)
hs.start('MyBluetoothHeadset', pincode='1234')
hs.enable_pairability(timeout=30)
self._hs = hs
# Wait forever
gobject.MainLoop().run()
#
# Call these from your event handlers
#
def __init__(self, service):
super(MainLoop, self).__init__()
self.__service = service
def run(self):
gobject.io_add_watch(0, gobject.IO_IN | gobject.IO_HUP, self.__stdin_cb)
super(MainLoop, self).run()
def __init__(self):
self.main_loop = gobject.MainLoop()
# Create a window with a horizontal scale.
self.wnd = gtk.Window()
self.wnd.set_default_size(640, 480)
self.wnd.set_title('Have fun with the transparency slider')
hscale = gtk.HScale()
hscale.set_digits(0)
hscale.set_increments(1, 10)
hscale.set_range(0, 100)
hscale.set_value(100)
hscale.connect('value_changed', self.set_window_alpha)
self.wnd.add(hscale)
# Note: gtk window must be realized before installing extensions.
self.wnd.realize()
self.wnd.show_all()
self.win32ext = GTKWin32Ext(self.wnd)
self.win32ext.add_notify_icon()
# GTK menus from the notify icon!
menu = gtk.Menu()
menu_item = gtk.MenuItem('Baloons!')
menu_item.connect_object('activate', self.menu_cb, self.wnd)
menu.append(menu_item)
menu_item = gtk.MenuItem('Fadeout Window')
menu_item.connect('activate', self.fadeoutwindow)
menu.append(menu_item)
menu_item = gtk.MenuItem('Window Disappeared?')
menu_item.connect('activate', self.fadeinwindow)
menu.append(menu_item)
menu.show_all()
self.win32ext.notify_icon.menu = menu
# Set up the callback messages
self.win32ext.message_map({
WM_TRAYMESSAGE: self.on_notifyicon_activity
})