def hr_msrmt_cb(self):
value = []
value.append(dbus.Byte(0x06))
value.append(dbus.Byte(randint(90, 130)))
if self.hr_ee_count % 10 == 0:
value[0] = dbus.Byte(value[0] | 0x08)
value.append(dbus.Byte(self.service.energy_expended & 0xff))
value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))
self.service.energy_expended = \
min(0xffff, self.service.energy_expended + 1)
self.hr_ee_count += 1
print('Updating value: ' + repr(value))
self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])
return self.notifying
python类service()的实例源码
def __init__(self):
dbus.service.Object.__init__(self)
#initialize variables that will be used during create and run
self.bus = None
self.package_dir = None
self.main_loop = None
self._timeout = False
self.dbus_name = None
self.xml_obj = BTOxml()
# cached D-BUS interfaces for _check_polkit_privilege()
self.dbus_info = None
self.polkit = None
self.progress_thread = None
self.enforce_polkit = True
#Enable translation for strings used
bindtextdomain(DOMAIN, LOCALEDIR)
textdomain(DOMAIN)
def create_dbus_server(cls, session_bus=False):
'''Return a D-BUS server backend instance.
Normally this connects to the system bus. Set session_bus to True to
connect to the session bus (for testing).
'''
backend = Backend()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
if session_bus:
backend.bus = dbus.SessionBus()
backend.enforce_polkit = False
else:
backend.bus = dbus.SystemBus()
try:
backend.dbus_name = dbus.service.BusName(DBUS_BUS_NAME, backend.bus)
except dbus.exceptions.DBusException as msg:
logging.error("Exception when spawning dbus service")
logging.error(msg)
return None
return backend
#
# Internal methods
#
def __init__(self, device_id=None):
"""Default initialiser.
1. Initialises the program loop using ``GObject``.
2. Registers the Application on the D-Bus.
3. Initialises the list of services offered by the application.
"""
# Initialise the D-Bus path and register it
self.bus = dbus.SystemBus()
self.path = '/ukBaz/bluezero'
self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
dbus.service.Object.__init__(self, self.bus_name, self.path)
# Objects to be associated with this service
self.managed_objs = []
self.eventloop = async_tools.EventLoop()
def __init__(self, service_id, uuid, primary):
"""Default initialiser.
1. Registers the service on the D-Bus.
2. Sets up the service UUID and primary flags.
:param service_id:
:param uuid: service BLE UUID
:param primary: whether or not the service is a primary service
"""
# Setup D-Bus object paths and register service
self.path = self.PATH_BASE + str('{0:04d}'.format(service_id))
self.bus = dbus.SystemBus()
self.interface = constants.GATT_SERVICE_IFACE
dbus.service.Object.__init__(self, self.bus, self.path)
self.props = {
constants.GATT_SERVICE_IFACE: {
'UUID': uuid,
'Primary': primary}
}
def GetAll(self, interface_name):
"""Return the service properties.
This method is registered with the D-Bus at
``org.freedesktop.DBus.Properties``
:param interface: interface to get the properties of.
The interface must be ``org.bluez.GattService1`` otherwise an
exception is raised.
"""
if interface_name != constants.GATT_SERVICE_IFACE:
raise InvalidArgsException()
try:
return self.props[interface_name]
except KeyError:
raise dbus.exceptions.DBusException(
'no such interface ' + interface_name,
name=interface_name + '.UnknownInterface')
def __init__(self, device_id=None):
"""Default initialiser.
1. Initialises the program loop using ``GObject``.
2. Registers the Application on the D-Bus.
3. Initialises the list of services offered by the application.
"""
# Initialise the loop that the application runs in
GObject.threads_init()
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.mainloop = GObject.MainLoop()
# Initialise the D-Bus path and register it
self.bus = dbus.SystemBus()
self.path = '/ukBaz/bluezero/application{}'.format(id(self))
self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
dbus.service.Object.__init__(self, self.bus_name, self.path)
# Initialise services within the application
self.services = []
self.dongle = adapter.Adapter(device_id)
def GetManagedObjects(self):
"""Get all objects that are managed by the application.
Return type is a dictionary whose keys are each registered object and
values the properties of the given object.
"""
response = {}
print('GetManagedObjects')
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
descs = chrc.get_descriptors()
for desc in descs:
response[desc.get_path()] = desc.get_properties()
return response
def get_properties(self):
"""Return a dictionary of the service properties.
The dictionary has the following keys:
- UUID: the service UUID
- Primary: whether the service is the primary service
- Characteristics: D-Bus array of the characteristic object paths
associated with the service.
"""
return {
constants.GATT_SERVICE_IFACE: {
'UUID': self.uuid,
'Primary': self.primary,
'Characteristics': dbus.Array(
self.get_characteristic_paths(),
signature='o')
}
}
def GetManagedObjects(self):
"""Get all objects that are managed by the service.
Return type is a dictionary whose keys are each registered object and
values the properties of the given object.
"""
response = {}
# print('GetManagedObjects')
response[self.get_path()] = self.get_properties()
chrcs = self.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
descs = chrc.get_descriptors()
for desc in descs:
response[desc.get_path()] = desc.get_properties()
return response
def get_properties(self):
"""Return a dictionary of the characteristic properties.
The dictionary has the following keys:
- Service: the characteristic's service
- UUID: the characteristic UUID
- Flags: any characteristic flags
- Descriptors: D-Bus array of the descriptor object paths
associated with the characteristic.
"""
return {
constants.GATT_CHRC_IFACE: {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
'Descriptors': dbus.Array(
self.get_descriptor_paths(),
signature='o')
}
}
def __init__(self, uuid, flags, characteristic):
""""Default initialiser.
1. Registers the descriptor on the characteristic path.
2. Sets up initial values.
:param uuid: descriptor BLE UUID.
:param flags: descriptor flags.
:param characteristic: characteristic that the descriptor is associated
with.
"""
# Register the descriptor on the characteristic path
self.index = id(self)
self.path = characteristic.path + '/desc' + str(self.index)
self.bus = characteristic.bus
dbus.service.Object.__init__(self, self.bus, self.path)
self.uuid = uuid
self.flags = flags
self.chrc = characteristic
def __init__(self, advert_id, ad_type):
"""Default initialiser.
Creates the interface to the specified advertising data.
The DBus path must be specified.
:param advert_id: Unique ID of advertisement.
:param ad_type: Possible values: "broadcast" or "peripheral"
"""
# Setup D-Bus object paths and register service
self.path = '/ukBaz/bluezero/advertisement{0:04d}'.format(advert_id)
self.bus = dbus.SystemBus()
self.eventloop = async_tools.EventLoop()
self.interface = constants.LE_ADVERTISEMENT_IFACE
dbus.service.Object.__init__(self, self.bus, self.path)
self.props = {
constants.LE_ADVERTISEMENT_IFACE: {
'Type': ad_type,
'ServiceUUIDs': None,
'ManufacturerData': None,
'SolicitUUIDs': None,
'ServiceData': None,
'IncludeTxPower': False
}
}
def run(self):
GObject.threads_init()
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus_name = dbus.service.BusName(
"com.spoppy",
dbus.SessionBus()
)
super(SpoppyDBusService, self).__init__(
bus_name, "/com/spoppy"
)
self._loop = GObject.MainLoop()
while self.running:
try:
logger.debug('Starting dbus loop')
self._loop.run()
except KeyboardInterrupt:
logger.debug('Loop interrupted, will restart')
def run(self):
self.service = SpoppyDBusService(self.lifecycle)
self.service_thread = threading.Thread(
target=self.service.run
)
self.service_thread.start()
logger.debug('Service started, waiting for kill signal')
self.stop_event.wait()
logger.debug('Kill signal received, stopping service')
self.service.stop()
while True:
logger.debug('Joining service thread with timeout 10s')
self.service_thread.join(10)
if not self.service_thread.is_alive():
break
def read_sdp_service_record(self):
print("Reading service record")
try:
fh = open(BTKbDevice.SDP_RECORD_PATH, "r")
except:
sys.exit("Could not open the sdp record. Exiting...")
return fh.read()
#listen for incoming client connections
#ideally this would be handled by the Bluez 5 profile
#but that didn't seem to work
def __init__(self, adapter, path):
ClassLogger.__init__(self)
dbus.service.Object.__init__(self, adapter.bus(), path)
self.__path = path
self.__capability = 'DisplayYesNo'
#
# These profile modes appear to cause the agent to ignore
# pin and passcode requests...
#
#self.__capability = 'NoInputNoOutput'
#self.__capability = 'DisplayOnly'
#self.__capability = 'KeyboardOnly'
adapter.adapter().RegisterAgent(path, self.__capability)
def __init__(self):
# initially set the standard logger
self.set_logger(logging.getLogger(__name__))
# initially set an empty configuration
self.set_config(configparser.ConfigParser())
# set up the quit signals
self.setup_signals(
signals = [signal.SIGINT, signal.SIGTERM, signal.SIGHUP],
handler = self.please_stop_now
)
# use glib as default mailoop for dbus
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
dbus.mainloop.glib.threads_init()
GLib.threads_init()
self.loop = GLib.MainLoop() # create mainloop
self.systembus = dbus.SystemBus() # the system bus
self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name
bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name
# register the object on the bus name
dbus.service.Object.__init__(self, bus_name, CO2MONITOR_OBJECTPATH)
def start_device_logging(self, devicefile):
self.logger.info(_("received request to start logging on device {}").format(
devicefile))
# create the object path name on the bus name
objectpath = "/".join([CO2MONITOR_OBJECTPATH,
utils.devicefile2objectname(devicefile)])
# check if device is already monitored
monitored_devices = self.get_monitored_devices_objects()
if objectpath in monitored_devices:
self.logger.warning(" ".join([
_("There is already a logging thread for device {}."),
_("This is odd... I better do nothing.")
]).format(devicefile))
return False
else:
try:
thread = LogThread(devicefile = devicefile) # logger object
self.logger.info(_("starting logging thread for device {}").format(
devicefile))
# same logger as service
thread.set_logger(self.logger)
# same config as service
thread.set_config(self.config)
thread.daemon = True # let this thread be a daemon thread
thread.start()
except OSError:
self.logger.critical(_("Could not access the device file '{}'."
).format(devicefile))
return False
except:
self.logger.critical(_(
"Something went wrong with device file '{}'."
).format(devicefile))
return False
return True
def __init__(self, devicefile):
# by default, log!
self.do_data_logging = True
# initially set the standard logger
self.set_logger(logging.getLogger(__name__))
# initially set an empty configuration
self.set_config(configparser.ConfigParser())
# use glib as default mailoop for dbus
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
dbus.mainloop.glib.threads_init()
GLib.threads_init()
self.systembus = dbus.SystemBus() # the system bus
self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name
bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name
self.devicefile = devicefile
# set up the device
self.device = device.co2device(self.devicefile)
# register the object on the bus name
objectpath = "/".join([CO2MONITOR_OBJECTPATH,
utils.devicefile2objectname(self.devicefile)])
dbus.service.Object.__init__(self, bus_name, objectpath)
self.update_status(_("idle"))
threading.Thread.__init__(self)
# set the config
def __init__(self, taskQueue, resultQueue):
self.taskQueue = taskQueue
self.resultQueue = resultQueue
bus_name = dbus.service.BusName('org.sim.simlab', bus=dbus.SessionBus())
dbus.service.Object.__init__(self, bus_name, '/org/sim/simlab')
def __init__(self, bus, index, advertising_type):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.ad_type = advertising_type
self.service_uuids = None
self.manufacturer_data = None
self.solicit_uuids = None
self.service_data = None
self.include_tx_power = None
dbus.service.Object.__init__(self, bus, self.path)
def __init__(self, bus):
self.path = '/'
self.services = []
dbus.service.Object.__init__(self, bus, self.path)
self.add_service(HeartRateService(bus, 0))
self.add_service(BatteryService(bus, 1))
self.add_service(TestService(bus, 2))
def add_service(self, service):
self.services.append(service)
def GetManagedObjects(self):
response = {}
print('GetManagedObjects')
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
descs = chrc.get_descriptors()
for desc in descs:
response[desc.get_path()] = desc.get_properties()
return response
def __init__(self, bus, index, uuid, primary):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.uuid = uuid
self.primary = primary
self.characteristics = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_CHRC_IFACE: {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
'Descriptors': dbus.Array(
self.get_descriptor_paths(),
signature='o')
}
}
def __init__(self, bus, index, uuid, flags, characteristic):
self.path = characteristic.path + '/desc' + str(index)
self.bus = bus
self.uuid = uuid
self.flags = flags
self.chrc = characteristic
dbus.service.Object.__init__(self, bus, self.path)
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.HR_MSRMT_UUID,
['notify'],
service)
self.notifying = False
self.hr_ee_count = 0
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.BODY_SNSR_LOC_UUID,
['read'],
service)