def __init__(self, device_id=None):
"""Default initialiser.
1. Initialises the program loop using ``GObject``.
2. Registers the Application on the D-Bus.
3. Initialises the list of services offered by the application.
"""
# Initialise the loop that the application runs in
GObject.threads_init()
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.mainloop = GObject.MainLoop()
# Initialise the D-Bus path and register it
self.bus = dbus.SystemBus()
self.path = '/ukBaz/bluezero/application{}'.format(id(self))
self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
dbus.service.Object.__init__(self, self.bus_name, self.path)
# Initialise services within the application
self.services = []
self.dongle = adapter.Adapter(device_id)
python类MainLoop()的实例源码
def run(self):
GObject.threads_init()
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus_name = dbus.service.BusName(
"com.spoppy",
dbus.SessionBus()
)
super(SpoppyDBusService, self).__init__(
bus_name, "/com/spoppy"
)
self._loop = GObject.MainLoop()
while self.running:
try:
logger.debug('Starting dbus loop')
self._loop.run()
except KeyboardInterrupt:
logger.debug('Loop interrupted, will restart')
def main():
global bluetooth
global ibus
bluetooth = bt_.BluetoothService(onBluetoothConnected, onPlayerChanged)
ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket)
ibus.cmd = ibus_.IBUSCommands(ibus)
ibus.main_thread = threading.Thread(target=ibus.start)
ibus.main_thread.daemon = True
ibus.main_thread.start()
try:
mainloop = GObject.MainLoop()
mainloop.run()
except KeyboardInterrupt:
pass
except:
print("Unable to run the gobject main loop")
print("")
shutdown()
sys.exit(0)
def main():
global ibus
ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket)
ibus.cmd = ibus_.IBUSCommands(ibus)
ibus.main_thread = threading.Thread(target=ibus.start)
ibus.main_thread.daemon = True
ibus.main_thread.start()
try:
mainloop = GObject.MainLoop()
mainloop.run()
except KeyboardInterrupt:
pass
except:
print("Unable to run the gobject main loop")
print("")
shutdown()
sys.exit(0)
def run(self):
"""
Registers advertisement and services to D-Bus and starts the main loop.
"""
if self._main_loop:
return
self._main_loop = GObject.MainLoop()
self._disconnect_all()
self._register()
logger.info("--- Mainloop started ---")
try:
self._main_loop.run()
except KeyboardInterrupt:
# ignore exception as it is a valid way to exit the program
# and skip to finally clause
pass
except Exception as e:
logger.error(e)
finally:
logger.info("--- Mainloop finished ---")
self._unregister()
self._main_loop.quit()
self._main_loop = None
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--adapter-name', type=str, help='Adapter name', default='')
args = parser.parse_args()
adapter_name = args.adapter_name
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
mainloop = GObject.MainLoop()
advertising.advertising_main(mainloop, bus, adapter_name)
gatt_server.gatt_server_main(mainloop, bus, adapter_name)
mainloop.run()
def __init__(self):
GObject.threads_init()
self._loop = GObject.MainLoop()
self._player = Player()
def pair(args):
"""
Pair to the specified device
Args:
args (dict): args parsed on the command line
Returns:
results (dict): return message and code of the operation
"""
def success():
try:
device_manager.trust_device()
device_manager.connect_device()
format_results({'result': 'Success', 'code': ''})
finally:
mainloop.quit()
def error(err):
try:
if err == 'org.freedesktop.DBus.Error.NoReply' and self.dev:
code = 'Timeout'
device_manager.cancel_device()
if err in ('org.bluez.Error.AuthenticationCanceled', 'org.bluez.Error.AuthenticationFailed',
'org.bluez.Error.AuthenticationRejected', 'org.bluez.Error.AuthenticationTimeout'):
code = 'AuthenticationError'
else:
code = 'CreatingDeviceFailed'
format_results({'result': 'Error', 'code': code})
finally:
mainloop.quit()
mainloop = MainLoop()
device_manager = DeviceManager(args.device)
device_manager.pair_device(success, error)
mainloop.run()
def scan_devices( duration = 10 ):
"""
This causes the bluetooth system to scan for any broadcasting devices. Once found, the devices get added to a
dbus backed database specific for bluetooth for retrieval later.
Args:
duration (int): the amount of time that the scan should run for in seconds.
"""
adapter = DeviceManager().find_adapter()
adapter.StartDiscovery()
mainloop = GObject.MainLoop()
GObject.timeout_add(duration * 1000, quit, mainloop)
mainloop.run()
def run(self):
"""
Starts the main loop that is necessary to receive Bluetooth events from the Bluetooth adapter.
This call blocks until you call `stop()` to stop the main loop.
"""
if self._main_loop:
return
self._interface_added_signal = self._bus.add_signal_receiver(
self._interfaces_added,
dbus_interface='org.freedesktop.DBus.ObjectManager',
signal_name='InterfacesAdded')
# TODO: Also listen to 'interfaces removed' events?
self._properties_changed_signal = self._bus.add_signal_receiver(
self._properties_changed,
dbus_interface=dbus.PROPERTIES_IFACE,
signal_name='PropertiesChanged',
arg0='org.bluez.Device1',
path_keyword='path')
def disconnect_signals():
for device in self._devices.values():
device.invalidate()
self._properties_changed_signal.remove()
self._interface_added_signal.remove()
self._main_loop = GObject.MainLoop()
try:
self._main_loop.run()
disconnect_signals()
except Exception:
disconnect_signals()
raise
def __init__(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.bus = dbus.SystemBus()
self._mainloop = GObject.MainLoop()
self.wifi_manager = WiFiControl()
self.callbacks = {}
self.current_state = self.OFF_STATE
self.current_ssid = None
self.reconnect_worker = DaemonTreeObj(WORKER_NAME)
def __init__(
self, client_class, timeout=180, capability="KeyboardDisplay",
path="/org/bluez/my_bluetooth_agent"):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.client_class = client_class
self.timeout = timeout
self.capability = capability
self.path = path
self._bus = dbus.SystemBus()
self._mainloop = GObject.MainLoop()
_bluetooth.make_discoverable(False)
def __init__(self, tcp_port_in=8043, tcp_port_out=None, channel=1):
self._spp = SerialPort(channel)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
dbus.service.Object.__init__(
self, dbus.SystemBus(), self._spp.profile_path)
self.tcp_port_in = tcp_port_in
self.tcp_port_out = tcp_port_out
self._mainloop = GObject.MainLoop()
def main():
global mainloop
global display
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
# Get ServiceManager and AdvertisingManager
service_manager = get_service_manager(bus)
ad_manager = get_ad_manager(bus)
# Create gatt services
display = setup_display()
app = LedApplication(bus, display)
# Create advertisement
test_advertisement = LedAdvertisement(bus, 0)
mainloop = GObject.MainLoop()
# Register gatt services
service_manager.RegisterApplication(app.get_path(), {},
reply_handler=register_app_cb,
error_handler=register_app_error_cb)
# Register advertisement
ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)
try:
mainloop.run()
except KeyboardInterrupt:
display.clear()
display.write_display()
def __init__(self, sony_av_indicator, device_service, state_service, command_service):
threading.Thread.__init__(self)
self.sony_av_indicator = sony_av_indicator
self.device_service = device_service
self.state_service = state_service
self.command_service = command_service
self.properties = {
ROOT_INTERFACE: self._get_root_iface_properties(),
PLAYER_INTERFACE: self._get_player_iface_properties()
}
self.main_loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default = True)
# self.main_loop = GObject.MainLoop()
self.bus = dbus.SessionBus(mainloop = self.main_loop)
self.bus_name = self._connect_to_dbus()
dbus.service.Object.__init__(self, self.bus_name, OBJECT_PATH)
def start_loop(self):
self.loop = threading.Thread(target=GObject.MainLoop().run)
self.loop.daemon = True
self.loop.start()
def bluetoothConnection():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
obj = bus.get_object(BUS_NAME, "/org/bluez");
profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
profile_path = "/foo/bar/profile"
auto_connect = {"AutoConnect": False}
profile_uuid = "1101"
profile = Profile(bus, profile_path)
profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
mainloop = GObject.MainLoop()
mainloop.run()
def bluetoothConnection():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
obj = bus.get_object(BUS_NAME, "/org/bluez")
profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
profile_path = "/foo/bar/profile"
auto_connect = {"AutoConnect": False}
profile_uuid = "1101"
profile = Profile(bus, profile_path)
profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
mainloop = GObject.MainLoop()
try:
mainloop.run()
except (KeyboardInterrupt, SystemExit):
closeThreads()
def bluetoothConnection():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
obj = bus.get_object(BUS_NAME, "/org/bluez");
profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
profile_path = "/foo/bar/profile"
auto_connect = {"AutoConnect": False}
profile_uuid = "1101"
profile = Profile(bus, profile_path)
profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
mainloop = GObject.MainLoop()
mainloop.run()
def run_mainloop_with(self, target):
"""Start the OS's main loop to process asyncronous BLE events and then
run the specified target function in a background thread. Target
function should be a function that takes no parameters and optionally
return an integer response code. When the target function stops
executing or returns with value then the main loop will be stopped and
the program will exit with the returned code.
Note that an OS main loop is required to process asyncronous BLE events
and this function is provided as a convenience for writing simple tools
and scripts that don't need to be full-blown GUI applications. If you
are writing a GUI application that has a main loop (a GTK glib main loop
on Linux, or a Cocoa main loop on OSX) then you don't need to call this
function.
"""
# Spin up a background thread to run the target code.
self._user_thread = threading.Thread(target=self._user_thread_main, args=(target,))
self._user_thread.daemon = True # Don't let the user thread block exit.
self._user_thread.start()
# Spin up a GLib main loop in the main thread to process async BLE events.
self._gobject_mainloop = GObject.MainLoop()
try:
self._gobject_mainloop.run() # Doesn't return until the mainloop ends.
except KeyboardInterrupt:
self._gobject_mainloop.quit()
sys.exit(0)
# Main loop finished. Check if an exception occured and throw it,
# otherwise return the status code from the user code.
if self._exception is not None:
# Rethrow exception with its original stack trace following advice from:
# http://nedbatchelder.com/blog/200711/rethrowing_exceptions_in_python.html
raise self._exception[1], None, self._exception[2]
else:
sys.exit(self._return_code)
def run_mainloop_with(self, target):
"""Start the OS's main loop to process asyncronous BLE events and then
run the specified target function in a background thread. Target
function should be a function that takes no parameters and optionally
return an integer response code. When the target function stops
executing or returns with value then the main loop will be stopped and
the program will exit with the returned code.
Note that an OS main loop is required to process asyncronous BLE events
and this function is provided as a convenience for writing simple tools
and scripts that don't need to be full-blown GUI applications. If you
are writing a GUI application that has a main loop (a GTK glib main loop
on Linux, or a Cocoa main loop on OSX) then you don't need to call this
function.
"""
# Spin up a background thread to run the target code.
self._user_thread = threading.Thread(target=self._user_thread_main, args=(target,))
self._user_thread.daemon = True # Don't let the user thread block exit.
self._user_thread.start()
# Spin up a GLib main loop in the main thread to process async BLE events.
self._gobject_mainloop = GObject.MainLoop()
try:
self._gobject_mainloop.run() # Doesn't return until the mainloop ends.
except KeyboardInterrupt:
self._gobject_mainloop.quit()
sys.exit(0)
# Main loop finished. Check if an exception occured and throw it,
# otherwise return the status code from the user code.
if self._exception is not None:
# Rethrow exception with its original stack trace following advice from:
# http://nedbatchelder.com/blog/200711/rethrowing_exceptions_in_python.html
raise self._exception[1], None, self._exception[2]
else:
sys.exit(self._return_code)
def start_glib_loop(cls):
"""
If your program does not use the GLib loop, call this
first. It initializes threads, GStreamer, and starts the GLib
loop in a separate thread.
"""
GObject.threads_init()
Gst.init(None)
cls.glib_loop = GObject.MainLoop()
cls.glib_thread = threading.Thread(target=cls.glib_loop.run)
cls.glib_thread.start()
def _listen_for_wifi_state_changes(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
def print_status(status, nm_state):
if status in (WifiConnectionState.CONNECTING, WifiConnectionState.CONNECTED):
logger.info("Wifi status changed: %s (%d) to %s" % (status, nm_state, self._current_ssid))
else:
logger.info("Wifi status changed: %s (%d)" % (status, nm_state))
def on_state_changed(nm_instance, nm_state, **kwargs):
if nm_state >= NetworkManager.NM_STATE_CONNECTED_GLOBAL:
new_status = WifiConnectionState.CONNECTED
elif nm_state > NetworkManager.NM_STATE_DISCONNECTING:
new_status = WifiConnectionState.CONNECTING
else:
new_status = WifiConnectionState.DISCONNECTED
self._update_current_ssid()
if new_status == self._wifi_status:
return
print_status(new_status, nm_state)
self._wifi_status = new_status
self._on_wifi_status_changed()
# check initial status:
initial_state = NetworkManager.NetworkManager.State
on_state_changed(None, initial_state)
# listen for changes:
NetworkManager.NetworkManager.OnStateChanged(on_state_changed)
logger.debug("Start listening to network status changes")
# Attention: a GObject.MainLoop() is required for this to work
# in this case it is started by the BLE Peripheral object
def run(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
# check initial status:
state = NetworkManager.NetworkManager.State
self._on_state_changed(None, state)
# listen for changes:
NetworkManager.NetworkManager.OnStateChanged(self._on_state_changed)
logger.debug("Start listening to network status changes")
loop = GObject.MainLoop()
loop.run()
def __init__(self):
""" Initialize app src. """
self._mainloop = GObject.MainLoop()
self._pipeline = Gst.Pipeline()
# Make elements.
self._src = Gst.ElementFactory.make('appsrc', 'appsrc')
decode = Gst.ElementFactory.make("decodebin", "decode")
self._queueaudio = Gst.ElementFactory.make('queue', 'queueaudio')
audioconvert = Gst.ElementFactory.make('audioconvert', 'audioconvert')
sink = Gst.ElementFactory.make('alsasink', 'sink')
self._src.set_property('stream-type', 'stream')
# Add to pipeline.
self._pipeline.add(self._src)
self._pipeline.add(decode)
self._pipeline.add(self._queueaudio)
self._pipeline.add(audioconvert)
self._pipeline.add(sink)
# Link elements.
self._src.link(decode)
self._queueaudio.link(audioconvert)
audioconvert.link(sink)
decode.connect('pad-added', self._decode_src_created)
def setUp(self):
self.loop = GObject.MainLoop(GObject.main_context_default())
self.error = False
def setUp(self):
self.loop = GObject.MainLoop(GObject.main_context_default())
self.error = False
self.orig_host = os.environ.get("SOFTWARE_CENTER_RECOMMENDER_HOST")
if not "SOFTWARE_CENTER_RECOMMENDER_HOST" in os.environ:
server = "https://rec.staging.ubuntu.com"
#server = "https://rec.ubuntu.com"
os.environ["SOFTWARE_CENTER_RECOMMENDER_HOST"] = server
def __init__(self, xid=0):
self.oauth = None
self.xid = xid
self.loop = GObject.MainLoop(GObject.main_context_default())
def clear_token_from_ubuntu_sso_sync(appname):
""" send a dbus signal to the com.ubuntu.sso service to clear
the credentials for the given appname, e.g. _("Ubuntu Software Center")
and wait for it to finish (or 2s)
"""
from ubuntu_sso import (
DBUS_BUS_NAME,
DBUS_CREDENTIALS_IFACE,
DBUS_CREDENTIALS_PATH,
)
# clean
loop = GObject.MainLoop()
bus = dbus.SessionBus()
obj = bus.get_object(bus_name=DBUS_BUS_NAME,
object_path=DBUS_CREDENTIALS_PATH,
follow_name_owner_changes=True)
proxy = dbus.Interface(object=obj,
dbus_interface=DBUS_CREDENTIALS_IFACE)
proxy.connect_to_signal("CredentialsCleared", loop.quit)
proxy.connect_to_signal("CredentialsNotFound", loop.quit)
proxy.connect_to_signal("CredentialsError", loop.quit)
proxy.clear_credentials(appname, {})
# ensure we don't hang forever here
GObject.timeout_add_seconds(2, loop.quit)
# run the mainloop until the credentials are clear
loop.run()
def __init__(self, xid=0):
self.oauth = None
self.xid = xid
self.loop = GObject.MainLoop(GObject.main_context_default())