python类MainLoop()的实例源码

peripheral.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, device_id=None):
        """Default initialiser.

        1. Initialises the program loop using ``GObject``.
        2. Registers the Application on the D-Bus.
        3. Initialises the list of services offered by the application.

        """
        # Initialise the loop that the application runs in
        GObject.threads_init()
        dbus.mainloop.glib.threads_init()
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.mainloop = GObject.MainLoop()

        # Initialise the D-Bus path and register it
        self.bus = dbus.SystemBus()
        self.path = '/ukBaz/bluezero/application{}'.format(id(self))
        self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
        dbus.service.Object.__init__(self, self.bus_name, self.path)

        # Initialise services within the application
        self.services = []

        self.dongle = adapter.Adapter(device_id)
dbus_listener.py 文件源码 项目:spoppy 作者: sindrig 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(self):
            GObject.threads_init()
            dbus.mainloop.glib.threads_init()
            dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
            bus_name = dbus.service.BusName(
                "com.spoppy",
                dbus.SessionBus()
            )
            super(SpoppyDBusService, self).__init__(
                bus_name, "/com/spoppy"
            )

            self._loop = GObject.MainLoop()
            while self.running:
                try:
                    logger.debug('Starting dbus loop')
                    self._loop.run()
                except KeyboardInterrupt:
                    logger.debug('Loop interrupted, will restart')
agent.py 文件源码 项目:BMW-RPi-iBUS 作者: KLUSEK 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def main():
    global bluetooth
    global ibus

    bluetooth = bt_.BluetoothService(onBluetoothConnected, onPlayerChanged)

    ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket)
    ibus.cmd = ibus_.IBUSCommands(ibus)

    ibus.main_thread = threading.Thread(target=ibus.start)
    ibus.main_thread.daemon = True
    ibus.main_thread.start()

    try:
        mainloop = GObject.MainLoop()
        mainloop.run()
    except KeyboardInterrupt:
        pass
    except:
        print("Unable to run the gobject main loop")

    print("")
    shutdown()
    sys.exit(0)
ibus_test.py 文件源码 项目:BMW-RPi-iBUS 作者: KLUSEK 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def main():
    global ibus

    ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket)
    ibus.cmd = ibus_.IBUSCommands(ibus)

    ibus.main_thread = threading.Thread(target=ibus.start)
    ibus.main_thread.daemon = True
    ibus.main_thread.start()

    try:
        mainloop = GObject.MainLoop()
        mainloop.run()
    except KeyboardInterrupt:
        pass
    except:
        print("Unable to run the gobject main loop")

    print("")
    shutdown()
    sys.exit(0)
bluez_peripheral.py 文件源码 项目:senic-hub 作者: getsenic 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        """
        Registers advertisement and services to D-Bus and starts the main loop.
        """
        if self._main_loop:
            return
        self._main_loop = GObject.MainLoop()
        self._disconnect_all()
        self._register()
        logger.info("--- Mainloop started ---")
        try:
            self._main_loop.run()
        except KeyboardInterrupt:
            # ignore exception as it is a valid way to exit the program
            # and skip to finally clause
            pass
        except Exception as e:
            logger.error(e)
        finally:
            logger.info("--- Mainloop finished ---")
            self._unregister()
            self._main_loop.quit()
            self._main_loop = None
gatt_server_example.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-a', '--adapter-name', type=str, help='Adapter name', default='')
    args = parser.parse_args()
    adapter_name = args.adapter_name

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    mainloop = GObject.MainLoop()

    advertising.advertising_main(mainloop, bus, adapter_name)
    gatt_server.gatt_server_main(mainloop, bus, adapter_name)
    mainloop.run()
adblockradio.py 文件源码 项目:adblockradio 作者: quasoft 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self):
        GObject.threads_init()

        self._loop = GObject.MainLoop()
        self._player = Player()
cli.py 文件源码 项目:bjarkan 作者: GetWellNetwork 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def pair(args):
    """
    Pair to the specified device

    Args:
        args (dict): args parsed on the command line

    Returns:
        results (dict): return message and code of the operation
    """
    def success():
        try:
            device_manager.trust_device()
            device_manager.connect_device()
            format_results({'result': 'Success', 'code': ''})
        finally:
            mainloop.quit()

    def error(err):
        try:
            if err == 'org.freedesktop.DBus.Error.NoReply' and self.dev:
                code = 'Timeout'
                device_manager.cancel_device()
            if err in ('org.bluez.Error.AuthenticationCanceled', 'org.bluez.Error.AuthenticationFailed',
                    'org.bluez.Error.AuthenticationRejected', 'org.bluez.Error.AuthenticationTimeout'):
                code = 'AuthenticationError'
            else:
                code = 'CreatingDeviceFailed'

            format_results({'result': 'Error', 'code': code})
        finally:
            mainloop.quit()

    mainloop = MainLoop()
    device_manager = DeviceManager(args.device)
    device_manager.pair_device(success, error)
    mainloop.run()
list_devices.py 文件源码 项目:bjarkan 作者: GetWellNetwork 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def scan_devices( duration = 10 ):
    """
    This causes the bluetooth system to scan for any broadcasting devices. Once found, the devices get added to a
    dbus backed database specific for bluetooth for retrieval later.

    Args:
        duration (int): the amount of time that the scan should run for in seconds.
    """
    adapter = DeviceManager().find_adapter()

    adapter.StartDiscovery()

    mainloop = GObject.MainLoop()
    GObject.timeout_add(duration * 1000, quit, mainloop)
    mainloop.run()
gatt_linux.py 文件源码 项目:gatt-python 作者: getsenic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run(self):
        """
        Starts the main loop that is necessary to receive Bluetooth events from the Bluetooth adapter.

        This call blocks until you call `stop()` to stop the main loop.
        """

        if self._main_loop:
            return

        self._interface_added_signal = self._bus.add_signal_receiver(
            self._interfaces_added,
            dbus_interface='org.freedesktop.DBus.ObjectManager',
            signal_name='InterfacesAdded')

        # TODO: Also listen to 'interfaces removed' events?

        self._properties_changed_signal = self._bus.add_signal_receiver(
            self._properties_changed,
            dbus_interface=dbus.PROPERTIES_IFACE,
            signal_name='PropertiesChanged',
            arg0='org.bluez.Device1',
            path_keyword='path')

        def disconnect_signals():
            for device in self._devices.values():
                device.invalidate()
            self._properties_changed_signal.remove()
            self._interface_added_signal.remove()

        self._main_loop = GObject.MainLoop()
        try:
            self._main_loop.run()
            disconnect_signals()
        except Exception:
            disconnect_signals()
            raise
wifimonitor.py 文件源码 项目:pywificontrol 作者: emlid 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.bus = dbus.SystemBus()
        self._mainloop = GObject.MainLoop()

        self.wifi_manager = WiFiControl()

        self.callbacks = {}

        self.current_state = self.OFF_STATE
        self.current_ssid = None

        self.reconnect_worker = DaemonTreeObj(WORKER_NAME)
agent.py 文件源码 项目:bluetool 作者: emlid 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(
            self, client_class, timeout=180, capability="KeyboardDisplay",
            path="/org/bluez/my_bluetooth_agent"):
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.client_class = client_class
        self.timeout = timeout
        self.capability = capability
        self.path = path
        self._bus = dbus.SystemBus()
        self._mainloop = GObject.MainLoop()
        _bluetooth.make_discoverable(False)
blueserver.py 文件源码 项目:bluetool 作者: emlid 项目源码 文件源码 阅读 70 收藏 0 点赞 0 评论 0
def __init__(self, tcp_port_in=8043, tcp_port_out=None, channel=1):
        self._spp = SerialPort(channel)
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        dbus.service.Object.__init__(
            self, dbus.SystemBus(), self._spp.profile_path)
        self.tcp_port_in = tcp_port_in
        self.tcp_port_out = tcp_port_out
        self._mainloop = GObject.MainLoop()
led_matrix.py 文件源码 项目:Bluetooth-Low-Energy-LED-Matrix 作者: WIStudent 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
    global mainloop
    global display

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    bus = dbus.SystemBus()

    # Get ServiceManager and AdvertisingManager
    service_manager = get_service_manager(bus)
    ad_manager = get_ad_manager(bus)

    # Create gatt services
    display = setup_display()
    app = LedApplication(bus, display)

    # Create advertisement
    test_advertisement = LedAdvertisement(bus, 0)

    mainloop = GObject.MainLoop()

    # Register gatt services
    service_manager.RegisterApplication(app.get_path(), {},
                                        reply_handler=register_app_cb,
                                        error_handler=register_app_error_cb)

    # Register advertisement
    ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
                                     reply_handler=register_ad_cb,
                                     error_handler=register_ad_error_cb)

    try:
        mainloop.run()
    except KeyboardInterrupt:
        display.clear()
        display.write_display()
indicator.py 文件源码 项目:sony-av-indicator 作者: aschaeffer 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, sony_av_indicator, device_service, state_service, command_service):
        threading.Thread.__init__(self)
        self.sony_av_indicator = sony_av_indicator
        self.device_service = device_service
        self.state_service = state_service
        self.command_service = command_service
        self.properties = {
            ROOT_INTERFACE: self._get_root_iface_properties(),
            PLAYER_INTERFACE: self._get_player_iface_properties()
        }
        self.main_loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default = True)
        # self.main_loop = GObject.MainLoop()
        self.bus = dbus.SessionBus(mainloop = self.main_loop)
        self.bus_name = self._connect_to_dbus()
        dbus.service.Object.__init__(self, self.bus_name, OBJECT_PATH)
client.py 文件源码 项目:matriz 作者: stressfm 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def start_loop(self):
        self.loop = threading.Thread(target=GObject.MainLoop().run)
        self.loop.daemon = True
        self.loop.start()
spp.py 文件源码 项目:CodeLabs 作者: TheIoTLearningInitiative 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def bluetoothConnection():
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
    profile_path = "/foo/bar/profile"
    auto_connect = {"AutoConnect": False}
    profile_uuid = "1101"
    profile = Profile(bus, profile_path)
    profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
    mainloop = GObject.MainLoop()
    mainloop.run()
spp.py 文件源码 项目:CodeLabs 作者: TheIoTLearningInitiative 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def bluetoothConnection():
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    obj = bus.get_object(BUS_NAME, "/org/bluez")
    profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
    profile_path = "/foo/bar/profile"
    auto_connect = {"AutoConnect": False}
    profile_uuid = "1101"
    profile = Profile(bus, profile_path)
    profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
    mainloop = GObject.MainLoop()
    try:
        mainloop.run()
    except (KeyboardInterrupt, SystemExit):
        closeThreads()
spp.py 文件源码 项目:CodeLabs 作者: TheIoTLearningInitiative 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def bluetoothConnection():
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
    profile_path = "/foo/bar/profile"
    auto_connect = {"AutoConnect": False}
    profile_uuid = "1101"
    profile = Profile(bus, profile_path)
    profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
    mainloop = GObject.MainLoop()
    mainloop.run()
provider.py 文件源码 项目:Specdrums_Python_BLE 作者: specdrums 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def run_mainloop_with(self, target):
        """Start the OS's main loop to process asyncronous BLE events and then
        run the specified target function in a background thread.  Target
        function should be a function that takes no parameters and optionally
        return an integer response code.  When the target function stops
        executing or returns with value then the main loop will be stopped and
        the program will exit with the returned code.

        Note that an OS main loop is required to process asyncronous BLE events
        and this function is provided as a convenience for writing simple tools
        and scripts that don't need to be full-blown GUI applications.  If you
        are writing a GUI application that has a main loop (a GTK glib main loop
        on Linux, or a Cocoa main loop on OSX) then you don't need to call this
        function.
        """
        # Spin up a background thread to run the target code.
        self._user_thread = threading.Thread(target=self._user_thread_main, args=(target,))
        self._user_thread.daemon = True  # Don't let the user thread block exit.
        self._user_thread.start()
        # Spin up a GLib main loop in the main thread to process async BLE events.
        self._gobject_mainloop = GObject.MainLoop()
        try:
            self._gobject_mainloop.run()  # Doesn't return until the mainloop ends.
        except KeyboardInterrupt:
            self._gobject_mainloop.quit()
            sys.exit(0)
        # Main loop finished.  Check if an exception occured and throw it,
        # otherwise return the status code from the user code.
        if self._exception is not None:
            # Rethrow exception with its original stack trace following advice from:
            # http://nedbatchelder.com/blog/200711/rethrowing_exceptions_in_python.html
            raise self._exception[1], None, self._exception[2]
        else:
            sys.exit(self._return_code)
provider.py 文件源码 项目:Specdrums_Python_BLE 作者: specdrums 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run_mainloop_with(self, target):
        """Start the OS's main loop to process asyncronous BLE events and then
        run the specified target function in a background thread.  Target
        function should be a function that takes no parameters and optionally
        return an integer response code.  When the target function stops
        executing or returns with value then the main loop will be stopped and
        the program will exit with the returned code.

        Note that an OS main loop is required to process asyncronous BLE events
        and this function is provided as a convenience for writing simple tools
        and scripts that don't need to be full-blown GUI applications.  If you
        are writing a GUI application that has a main loop (a GTK glib main loop
        on Linux, or a Cocoa main loop on OSX) then you don't need to call this
        function.
        """
        # Spin up a background thread to run the target code.
        self._user_thread = threading.Thread(target=self._user_thread_main, args=(target,))
        self._user_thread.daemon = True  # Don't let the user thread block exit.
        self._user_thread.start()
        # Spin up a GLib main loop in the main thread to process async BLE events.
        self._gobject_mainloop = GObject.MainLoop()
        try:
            self._gobject_mainloop.run()  # Doesn't return until the mainloop ends.
        except KeyboardInterrupt:
            self._gobject_mainloop.quit()
            sys.exit(0)
        # Main loop finished.  Check if an exception occured and throw it,
        # otherwise return the status code from the user code.
        if self._exception is not None:
            # Rethrow exception with its original stack trace following advice from:
            # http://nedbatchelder.com/blog/200711/rethrowing_exceptions_in_python.html
            raise self._exception[1], None, self._exception[2]
        else:
            sys.exit(self._return_code)
pyplaybin.py 文件源码 项目:pyplaybin 作者: fraca7 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def start_glib_loop(cls):
        """
        If your program does not use the GLib loop, call this
        first. It initializes threads, GStreamer, and starts the GLib
        loop in a separate thread.
        """
        GObject.threads_init()
        Gst.init(None)

        cls.glib_loop = GObject.MainLoop()
        cls.glib_thread = threading.Thread(target=cls.glib_loop.run)
        cls.glib_thread.start()
bluenet.py 文件源码 项目:senic-hub 作者: getsenic 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _listen_for_wifi_state_changes(self):
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

        def print_status(status, nm_state):
            if status in (WifiConnectionState.CONNECTING, WifiConnectionState.CONNECTED):
                logger.info("Wifi status changed: %s (%d) to %s" % (status, nm_state, self._current_ssid))
            else:
                logger.info("Wifi status changed: %s (%d)" % (status, nm_state))

        def on_state_changed(nm_instance, nm_state, **kwargs):

            if nm_state >= NetworkManager.NM_STATE_CONNECTED_GLOBAL:
                new_status = WifiConnectionState.CONNECTED
            elif nm_state > NetworkManager.NM_STATE_DISCONNECTING:
                new_status = WifiConnectionState.CONNECTING
            else:
                new_status = WifiConnectionState.DISCONNECTED

            self._update_current_ssid()

            if new_status == self._wifi_status:
                return

            print_status(new_status, nm_state)
            self._wifi_status = new_status
            self._on_wifi_status_changed()

        # check initial status:
        initial_state = NetworkManager.NetworkManager.State
        on_state_changed(None, initial_state)

        # listen for changes:
        NetworkManager.NetworkManager.OnStateChanged(on_state_changed)
        logger.debug("Start listening to network status changes")
        # Attention: a GObject.MainLoop() is required for this to work
        # in this case it is started by the BLE Peripheral object
netwatch.py 文件源码 项目:senic-hub 作者: getsenic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run(self):
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

        # check initial status:
        state = NetworkManager.NetworkManager.State
        self._on_state_changed(None, state)

        # listen for changes:
        NetworkManager.NetworkManager.OnStateChanged(self._on_state_changed)
        logger.debug("Start listening to network status changes")
        loop = GObject.MainLoop()
        loop.run()
gstreamer.py 文件源码 项目:python-snapcast 作者: happyleavesaoc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
        """ Initialize app src. """
        self._mainloop = GObject.MainLoop()
        self._pipeline = Gst.Pipeline()

        # Make elements.
        self._src = Gst.ElementFactory.make('appsrc', 'appsrc')
        decode = Gst.ElementFactory.make("decodebin", "decode")
        self._queueaudio = Gst.ElementFactory.make('queue', 'queueaudio')
        audioconvert = Gst.ElementFactory.make('audioconvert', 'audioconvert')
        sink = Gst.ElementFactory.make('alsasink', 'sink')

        self._src.set_property('stream-type', 'stream')

        # Add to pipeline.
        self._pipeline.add(self._src)
        self._pipeline.add(decode)
        self._pipeline.add(self._queueaudio)
        self._pipeline.add(audioconvert)
        self._pipeline.add(sink)

        # Link elements.
        self._src.link(decode)
        self._queueaudio.link(audioconvert)
        audioconvert.link(sink)
        decode.connect('pad-added', self._decode_src_created)
test_scagent.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        self.loop = GObject.MainLoop(GObject.main_context_default())
        self.error = False
test_recagent.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setUp(self):
        self.loop = GObject.MainLoop(GObject.main_context_default())
        self.error = False
        self.orig_host = os.environ.get("SOFTWARE_CENTER_RECOMMENDER_HOST")
        if not "SOFTWARE_CENTER_RECOMMENDER_HOST" in os.environ:
        server = "https://rec.staging.ubuntu.com"
        #server = "https://rec.ubuntu.com"
            os.environ["SOFTWARE_CENTER_RECOMMENDER_HOST"] = server
piston_generic_helper.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, xid=0):
        self.oauth = None
        self.xid = xid
        self.loop = GObject.MainLoop(GObject.main_context_default())
utils.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def clear_token_from_ubuntu_sso_sync(appname):
    """ send a dbus signal to the com.ubuntu.sso service to clear
        the credentials for the given appname, e.g. _("Ubuntu Software Center")
        and wait for it to finish (or 2s)
    """
    from ubuntu_sso import (
        DBUS_BUS_NAME,
        DBUS_CREDENTIALS_IFACE,
        DBUS_CREDENTIALS_PATH,
        )
    # clean
    loop = GObject.MainLoop()
    bus = dbus.SessionBus()
    obj = bus.get_object(bus_name=DBUS_BUS_NAME,
                         object_path=DBUS_CREDENTIALS_PATH,
                         follow_name_owner_changes=True)
    proxy = dbus.Interface(object=obj,
                           dbus_interface=DBUS_CREDENTIALS_IFACE)
    proxy.connect_to_signal("CredentialsCleared", loop.quit)
    proxy.connect_to_signal("CredentialsNotFound", loop.quit)
    proxy.connect_to_signal("CredentialsError", loop.quit)
    proxy.clear_credentials(appname, {})
    # ensure we don't hang forever here
    GObject.timeout_add_seconds(2, loop.quit)
    # run the mainloop until the credentials are clear
    loop.run()
piston_generic_helper.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, xid=0):
        self.oauth = None
        self.xid = xid
        self.loop = GObject.MainLoop(GObject.main_context_default())


问题


面经


文章

微信
公众号

扫码关注公众号