python类service()的实例源码

gatt_server.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def hr_msrmt_cb(self):
        value = []
        value.append(dbus.Byte(0x06))

        value.append(dbus.Byte(randint(90, 130)))

        if self.hr_ee_count % 10 == 0:
            value[0] = dbus.Byte(value[0] | 0x08)
            value.append(dbus.Byte(self.service.energy_expended & 0xff))
            value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))

        self.service.energy_expended = \
                min(0xffff, self.service.energy_expended + 1)
        self.hr_ee_count += 1

        print('Updating value: ' + repr(value))

        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])

        return self.notifying
recovery_backend.py 文件源码 项目:dell-recovery 作者: dell 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        dbus.service.Object.__init__(self)

        #initialize variables that will be used during create and run
        self.bus = None
        self.package_dir = None
        self.main_loop = None
        self._timeout = False
        self.dbus_name = None
        self.xml_obj = BTOxml()

        # cached D-BUS interfaces for _check_polkit_privilege()
        self.dbus_info = None
        self.polkit = None
        self.progress_thread = None
        self.enforce_polkit = True

        #Enable translation for strings used
        bindtextdomain(DOMAIN, LOCALEDIR)
        textdomain(DOMAIN)
recovery_backend.py 文件源码 项目:dell-recovery 作者: dell 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_dbus_server(cls, session_bus=False):
        '''Return a D-BUS server backend instance.

        Normally this connects to the system bus. Set session_bus to True to
        connect to the session bus (for testing).

        '''
        backend = Backend()
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        if session_bus:
            backend.bus = dbus.SessionBus()
            backend.enforce_polkit = False
        else:
            backend.bus = dbus.SystemBus()
        try:
            backend.dbus_name = dbus.service.BusName(DBUS_BUS_NAME, backend.bus)
        except dbus.exceptions.DBusException as msg:
            logging.error("Exception when spawning dbus service")
            logging.error(msg)
            return None
        return backend

    #
    # Internal methods
    #
localGATT.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, device_id=None):
        """Default initialiser.

        1. Initialises the program loop using ``GObject``.
        2. Registers the Application on the D-Bus.
        3. Initialises the list of services offered by the application.

        """
        # Initialise the D-Bus path and register it
        self.bus = dbus.SystemBus()
        self.path = '/ukBaz/bluezero'
        self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
        dbus.service.Object.__init__(self, self.bus_name, self.path)

        # Objects to be associated with this service
        self.managed_objs = []
        self.eventloop = async_tools.EventLoop()
localGATT.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, service_id, uuid, primary):
        """Default initialiser.

        1. Registers the service on the D-Bus.
        2. Sets up the service UUID and primary flags.

        :param service_id:
        :param uuid: service BLE UUID
        :param primary: whether or not the service is a primary service
        """
        # Setup D-Bus object paths and register service
        self.path = self.PATH_BASE + str('{0:04d}'.format(service_id))
        self.bus = dbus.SystemBus()
        self.interface = constants.GATT_SERVICE_IFACE
        dbus.service.Object.__init__(self, self.bus, self.path)
        self.props = {
            constants.GATT_SERVICE_IFACE: {
                'UUID': uuid,
                'Primary': primary}
        }
localGATT.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def GetAll(self, interface_name):
        """Return the service properties.

        This method is registered with the D-Bus at
        ``org.freedesktop.DBus.Properties``

        :param interface: interface to get the properties of.

        The interface must be ``org.bluez.GattService1`` otherwise an
        exception is raised.
        """
        if interface_name != constants.GATT_SERVICE_IFACE:
            raise InvalidArgsException()

        try:
            return self.props[interface_name]
        except KeyError:
            raise dbus.exceptions.DBusException(
                'no such interface ' + interface_name,
                name=interface_name + '.UnknownInterface')
peripheral.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, device_id=None):
        """Default initialiser.

        1. Initialises the program loop using ``GObject``.
        2. Registers the Application on the D-Bus.
        3. Initialises the list of services offered by the application.

        """
        # Initialise the loop that the application runs in
        GObject.threads_init()
        dbus.mainloop.glib.threads_init()
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.mainloop = GObject.MainLoop()

        # Initialise the D-Bus path and register it
        self.bus = dbus.SystemBus()
        self.path = '/ukBaz/bluezero/application{}'.format(id(self))
        self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
        dbus.service.Object.__init__(self, self.bus_name, self.path)

        # Initialise services within the application
        self.services = []

        self.dongle = adapter.Adapter(device_id)
peripheral.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def GetManagedObjects(self):
        """Get all objects that are managed by the application.

        Return type is a dictionary whose keys are each registered object and
        values the properties of the given object.
        """
        response = {}
        print('GetManagedObjects')

        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response
peripheral.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_properties(self):
        """Return a dictionary of the service properties.

        The dictionary has the following keys:

        - UUID: the service UUID
        - Primary: whether the service is the primary service
        - Characteristics: D-Bus array of the characteristic object paths
          associated with the service.

        """
        return {
            constants.GATT_SERVICE_IFACE: {
                'UUID': self.uuid,
                'Primary': self.primary,
                'Characteristics': dbus.Array(
                    self.get_characteristic_paths(),
                    signature='o')
            }
        }
peripheral.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def GetManagedObjects(self):
        """Get all objects that are managed by the service.

        Return type is a dictionary whose keys are each registered object and
        values the properties of the given object.
        """
        response = {}
        # print('GetManagedObjects')

        response[self.get_path()] = self.get_properties()
        chrcs = self.get_characteristics()
        for chrc in chrcs:
            response[chrc.get_path()] = chrc.get_properties()
            descs = chrc.get_descriptors()
            for desc in descs:
                response[desc.get_path()] = desc.get_properties()

        return response
peripheral.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_properties(self):
        """Return a dictionary of the characteristic properties.

        The dictionary has the following keys:

        - Service: the characteristic's service
        - UUID: the characteristic UUID
        - Flags: any characteristic flags
        - Descriptors: D-Bus array of the descriptor object paths
          associated with the characteristic.

        """
        return {
            constants.GATT_CHRC_IFACE: {
                'Service': self.service.get_path(),
                'UUID': self.uuid,
                'Flags': self.flags,
                'Descriptors': dbus.Array(
                    self.get_descriptor_paths(),
                    signature='o')
            }
        }
peripheral.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, uuid, flags, characteristic):
        """"Default initialiser.

        1. Registers the descriptor on the characteristic path.
        2. Sets up initial values.

        :param uuid: descriptor BLE UUID.
        :param flags: descriptor flags.
        :param characteristic: characteristic that the descriptor is associated
                               with.
        """
        # Register the descriptor on the characteristic path
        self.index = id(self)
        self.path = characteristic.path + '/desc' + str(self.index)
        self.bus = characteristic.bus
        dbus.service.Object.__init__(self, self.bus, self.path)

        self.uuid = uuid
        self.flags = flags
        self.chrc = characteristic
advertisement.py 文件源码 项目:python-bluezero 作者: ukBaz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, advert_id, ad_type):
        """Default initialiser.

        Creates the interface to the specified advertising data.
        The DBus path must be specified.

        :param advert_id: Unique ID of advertisement.
        :param ad_type: Possible values: "broadcast" or "peripheral"
        """
        # Setup D-Bus object paths and register service
        self.path = '/ukBaz/bluezero/advertisement{0:04d}'.format(advert_id)
        self.bus = dbus.SystemBus()
        self.eventloop = async_tools.EventLoop()
        self.interface = constants.LE_ADVERTISEMENT_IFACE
        dbus.service.Object.__init__(self, self.bus, self.path)
        self.props = {
            constants.LE_ADVERTISEMENT_IFACE: {
                'Type': ad_type,
                'ServiceUUIDs': None,
                'ManufacturerData': None,
                'SolicitUUIDs': None,
                'ServiceData': None,
                'IncludeTxPower': False
            }
        }
dbus_listener.py 文件源码 项目:spoppy 作者: sindrig 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
            GObject.threads_init()
            dbus.mainloop.glib.threads_init()
            dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
            bus_name = dbus.service.BusName(
                "com.spoppy",
                dbus.SessionBus()
            )
            super(SpoppyDBusService, self).__init__(
                bus_name, "/com/spoppy"
            )

            self._loop = GObject.MainLoop()
            while self.running:
                try:
                    logger.debug('Starting dbus loop')
                    self._loop.run()
                except KeyboardInterrupt:
                    logger.debug('Loop interrupted, will restart')
dbus_listener.py 文件源码 项目:spoppy 作者: sindrig 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(self):
        self.service = SpoppyDBusService(self.lifecycle)
        self.service_thread = threading.Thread(
            target=self.service.run
        )
        self.service_thread.start()

        logger.debug('Service started, waiting for kill signal')
        self.stop_event.wait()
        logger.debug('Kill signal received, stopping service')
        self.service.stop()
        while True:
            logger.debug('Joining service thread with timeout 10s')
            self.service_thread.join(10)
            if not self.service_thread.is_alive():
                break
btk_server.py 文件源码 项目:BlogCode 作者: yaptb 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def read_sdp_service_record(self):

        print("Reading service record")

        try:
            fh = open(BTKbDevice.SDP_RECORD_PATH, "r")
        except:
            sys.exit("Could not open the sdp record. Exiting...")

        return fh.read()   



    #listen for incoming client connections
    #ideally this would be handled by the Bluez 5 profile 
    #but that didn't seem to work
bluez4.py 文件源码 项目:phony 作者: littlecraft 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, adapter, path):
    ClassLogger.__init__(self)
    dbus.service.Object.__init__(self, adapter.bus(), path)

    self.__path = path
    self.__capability = 'DisplayYesNo'

    #
    # These profile modes appear to cause the agent to ignore
    # pin and passcode requests...
    #
    #self.__capability = 'NoInputNoOutput'
    #self.__capability = 'DisplayOnly'
    #self.__capability = 'KeyboardOnly'

    adapter.adapter().RegisterAgent(path, self.__capability)
service.py 文件源码 项目:co2monitor 作者: nobodyinperson 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self):
        # initially set the standard logger
        self.set_logger(logging.getLogger(__name__))
        # initially set an empty configuration
        self.set_config(configparser.ConfigParser())
        # set up the quit signals
        self.setup_signals(
            signals = [signal.SIGINT, signal.SIGTERM, signal.SIGHUP],
            handler = self.please_stop_now
        )

        # use glib as default mailoop for dbus
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        dbus.mainloop.glib.threads_init()
        GLib.threads_init()

        self.loop = GLib.MainLoop() # create mainloop

        self.systembus = dbus.SystemBus() # the system bus
        self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name
        bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name
        # register the object on the bus name
        dbus.service.Object.__init__(self, bus_name, CO2MONITOR_OBJECTPATH)
service.py 文件源码 项目:co2monitor 作者: nobodyinperson 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def start_device_logging(self, devicefile):
        self.logger.info(_("received request to start logging on device {}").format(
            devicefile))
        # create the object path name on the bus name
        objectpath = "/".join([CO2MONITOR_OBJECTPATH,
            utils.devicefile2objectname(devicefile)])
        # check if device is already monitored
        monitored_devices = self.get_monitored_devices_objects()
        if objectpath in monitored_devices:
            self.logger.warning(" ".join([
            _("There is already a logging thread for device {}."),
            _("This is odd... I better do nothing.")
            ]).format(devicefile))
            return False
        else:
            try:
                thread = LogThread(devicefile = devicefile) # logger object
                self.logger.info(_("starting logging thread for device {}").format(
                    devicefile))
                # same logger as service
                thread.set_logger(self.logger)
                # same config as service
                thread.set_config(self.config)
                thread.daemon = True # let this thread be a daemon thread
                thread.start()
            except OSError:
                self.logger.critical(_("Could not access the device file '{}'."
                    ).format(devicefile))
                return False
            except:
                self.logger.critical(_(
                    "Something went wrong with device file '{}'."
                    ).format(devicefile))
                return False
            return True
service.py 文件源码 项目:co2monitor 作者: nobodyinperson 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, devicefile):
        # by default, log!
        self.do_data_logging = True
        # initially set the standard logger
        self.set_logger(logging.getLogger(__name__))
        # initially set an empty configuration
        self.set_config(configparser.ConfigParser())

        # use glib as default mailoop for dbus
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        dbus.mainloop.glib.threads_init()
        GLib.threads_init()

        self.systembus = dbus.SystemBus() # the system bus
        self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name
        bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name

        self.devicefile = devicefile

        # set up the device
        self.device = device.co2device(self.devicefile)

        # register the object on the bus name
        objectpath = "/".join([CO2MONITOR_OBJECTPATH,
            utils.devicefile2objectname(self.devicefile)])
        dbus.service.Object.__init__(self, bus_name, objectpath)
        self.update_status(_("idle"))
        threading.Thread.__init__(self)


    # set the config
dbus_ctrl.py 文件源码 项目:simLAB 作者: kamwar 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, taskQueue, resultQueue):
        self.taskQueue = taskQueue
        self.resultQueue = resultQueue
        bus_name = dbus.service.BusName('org.sim.simlab', bus=dbus.SessionBus())
        dbus.service.Object.__init__(self, bus_name, '/org/sim/simlab')
advertising.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, bus, index, advertising_type):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.ad_type = advertising_type
        self.service_uuids = None
        self.manufacturer_data = None
        self.solicit_uuids = None
        self.service_data = None
        self.include_tx_power = None
        dbus.service.Object.__init__(self, bus, self.path)
gatt_server.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, bus):
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)
        self.add_service(HeartRateService(bus, 0))
        self.add_service(BatteryService(bus, 1))
        self.add_service(TestService(bus, 2))
gatt_server.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def add_service(self, service):
        self.services.append(service)
gatt_server.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def GetManagedObjects(self):
        response = {}
        print('GetManagedObjects')

        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response
gatt_server.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, bus, index, uuid, primary):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.uuid = uuid
        self.primary = primary
        self.characteristics = []
        dbus.service.Object.__init__(self, bus, self.path)
gatt_server.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_properties(self):
        return {
                GATT_CHRC_IFACE: {
                        'Service': self.service.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                        'Descriptors': dbus.Array(
                                self.get_descriptor_paths(),
                                signature='o')
                }
        }
gatt_server.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, bus, index, uuid, flags, characteristic):
        self.path = characteristic.path + '/desc' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.flags = flags
        self.chrc = characteristic
        dbus.service.Object.__init__(self, bus, self.path)
gatt_server.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.HR_MSRMT_UUID,
                ['notify'],
                service)
        self.notifying = False
        self.hr_ee_count = 0
gatt_server.py 文件源码 项目:python-gatt-server 作者: Jumperr-labs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.BODY_SNSR_LOC_UUID,
                ['read'],
                service)


问题


面经


文章

微信
公众号

扫码关注公众号