def WriteValue(self, value, options):
print('Heart Rate Control Point WriteValue called')
if len(value) != 1:
raise exceptions.InvalidValueLengthException()
byte = value[0]
print('Control Point value: ' + repr(byte))
if byte != 1:
raise exceptions.FailedException("0x80")
print('Energy Expended field reset!')
self.service.energy_expended = 0
python类service()的实例源码
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.BATTERY_LVL_UUID,
['read', 'notify'],
service)
self.notifying = False
self.battery_lvl = 100
GObject.timeout_add(5000, self.drain_battery)
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.TEST_CHRC_UUID,
['read', 'write', 'writable-auxiliaries'],
service)
self.value = []
self.add_descriptor(TestDescriptor(bus, 0, self))
self.add_descriptor(
CharacteristicUserDescriptionDescriptor(bus, 1, self))
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.TEST_CHRC_UUID,
['encrypt-read', 'encrypt-write'],
service)
self.value = []
self.add_descriptor(TestEncryptDescriptor(bus, 2, self))
self.add_descriptor(
CharacteristicUserDescriptionDescriptor(bus, 3, self))
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.TEST_CHRC_UUID,
['secure-read', 'secure-write'],
service)
self.value = []
self.add_descriptor(TestSecureDescriptor(bus, 2, self))
self.add_descriptor(
CharacteristicUserDescriptionDescriptor(bus, 3, self))
def __new__(cls, name, bases, attrs):
if bases != (dbus.service.Object,):
attrs['GetSecretsImpl'] = attrs.pop('GetSecrets')
return super(SecretAgentType, cls).__new__(cls, name, bases, attrs)
def __init__(self, identifier):
self.identifier = identifier
dbus.service.Object.__init__(self, dbus.SystemBus(), self.object_path)
AgentManager.Register(self.identifier)
def run(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus_name = dbus.service.BusName("com.prozacville.steam_monitor", dbus.SessionBus())
dbus.service.Object.__init__(self, bus_name, "/com/prozacville/steam_monitor")
self._loop = gobject.MainLoop()
print "Service running..."
self._loop.run()
print "Service stopped"
def __init__( self ):
bus_name = dbus.service.BusName( BUSNAME, bus = dbus.SystemBus() )
super().__init__( bus_name = bus_name, object_path = SUPPORT_OBJECTPATH )
def GetState( self ):
"""
Return the current runtime state of the service.
Returns:
str: json encoded, free-form-ish dictionary of runtime state information
"""
return json.dumps( {} )
def __init__(self, ui, home, tr_ay, new_tr):
global tray, new_tray_widget
tray = tr_ay
new_tray_widget = new_tr
bus = dbus.service.BusName(
AW_MPRIS_BUS_NAME,
bus=dbus.SessionBus())
super().__init__(bus, MPRIS_OBJECT_PATH)
self._properties = dbus.Dictionary({
'DesktopEntry': 'kawaii-player',
'Identity': 'kawaii-player',
}, signature='sv')
self._player_properties = dbus.Dictionary({
'Metadata': dbus.Dictionary({
'mpris:artUrl': '',
'xesam:artist': ['None'],
'xesam:title': 'None',
'xesam:album': 'None'
}, signature='sv', variant_level=1),
'CanGoNext': True,
'CanGoPrevious': True,
'CanPause': True,
'CanPlay': True,
'CanControl': True,
'CanStop': True,
}, signature='sv', variant_level=2)
self.ui = ui
self.home = home
def __init__(self, bus, mainloop):
bus_name = dbus.service.BusName(INTERFACE, bus=bus)
PolicyKitService.__init__(self, bus_name, PATH)
self.mainloop = mainloop
def run_dbus_service(self, timeout=None, send_usr1=False):
'''Run D-BUS server.
If no timeout is given, the server will run forever, otherwise it will
return after the specified number of seconds.
If send_usr1 is True, this will send a SIGUSR1 to the parent process
once the server is ready to take requests.
'''
dbus.service.Object.__init__(self, self.bus, '/RecoveryMedia')
self.main_loop = GLib.MainLoop()
self._timeout = False
if timeout:
def _quit():
"""This function is ran at the end of timeout"""
self.main_loop.quit()
return True
GLib.timeout_add(timeout * 1000, _quit)
# send parent process a signal that we are ready now
if send_usr1:
os.kill(os.getppid(), signal.SIGUSR1)
# run until we time out
while not self._timeout:
if timeout:
self._timeout = True
self.main_loop.run()
def add_managed_object(self, object):
"""Add a service to the list of services offered by the Application.
:param object: Python object of dbus path to be managed
"""
self.managed_objs.append(object)
def __init__(self, characteristic_id,
uuid,
service_obj,
value,
notifying,
flags):
"""Default initialiser.
1. Registers the characteristc on the D-Bus.
2. Sets up the service UUID and primary flags.
:param service_id:
:param uuid: service BLE UUID
:param service_obj: the service that this characteristic is part of
:param value: the initial value of this characteristic
:param notifying: boolean representing the state of notification
:param flags:
"""
# Setup D-Bus object paths and register service
PATH_BASE = service_obj.get_path() + '/char'
self.path = PATH_BASE + str('{0:04d}'.format(characteristic_id))
self.bus = dbus.SystemBus()
dbus.service.Object.__init__(self, self.bus, self.path)
self.props = {
constants.GATT_CHRC_IFACE: {
'UUID': uuid,
'Service': service_obj.get_path(),
'Value': value,
'Notifying': notifying,
'Flags': flags}
}
for prop in self.props[constants.GATT_CHRC_IFACE].keys():
self.Set(constants.GATT_CHRC_IFACE,
prop,
self.props[constants.GATT_CHRC_IFACE][prop])
def __init__(self,
descriptor_id,
uuid,
characteristic_obj,
value,
flags):
"""Default initialiser.
1. Registers the descriptor on the D-Bus.
2. Sets up the service UUID and primary flags.
:param descriptor_id: A unique identifier for this descriptor
:param uuid: descriptor BLE UUID
:param characteristic_obj: The characteristic that this descriptor
is related to
:param value: The initial value of the descriptor
:param flags: Flags specifying access permissions
"""
# Setup D-Bus object paths and register service
PATH_BASE = characteristic_obj.get_path() + '/desc'
self.path = PATH_BASE + str('{0:04d}'.format(descriptor_id))
self.bus = dbus.SystemBus()
dbus.service.Object.__init__(self, self.bus, self.path)
self.props = {
constants.GATT_DESC_IFACE: {
'UUID': uuid,
'Characteristic': characteristic_obj.get_path(),
'Value': value,
'Flags': flags}
}
for prop in self.props[constants.GATT_DESC_IFACE].keys():
self.Set(constants.GATT_DESC_IFACE,
prop,
self.props[constants.GATT_DESC_IFACE][prop])
def add_service(self, service):
"""Add a service to the list of services offered by the Application.
:param service: the service to be added (type: peripheral.Service)
"""
self.services.append(service)
def get_primary_service(self):
"""Get the *primary* service registered with the Application."""
# services = self.GetManagedObjects()
primary_uuid = None
for service in self.services:
if service.primary:
logger.debug(service.uuid)
primary_uuid = service.uuid
return primary_uuid
def UUID(self):
"""Return Service UUID"""
return self.service.Get(constants.GATT_SERVICE_IFACE, 'UUID')
def add_characteristic(self, characteristic):
"""Add a characteristic.
Adds a characteristic to the list of characteristics offered by the
service.
:param characteristic: the characteristic to be added.
(type: peripheral.Characteristic)
"""
self.characteristics.append(characteristic)