def unwrap(self, val):
if isinstance(val, dbus.ByteArray):
return "".join([str(x) for x in val])
if isinstance(val, (dbus.Array, list, tuple)):
return [self.unwrap(x) for x in val]
if isinstance(val, (dbus.Dictionary, dict)):
return dict([(self.unwrap(x), self.unwrap(y)) for x, y in val.items()])
if isinstance(val, dbus.ObjectPath):
if val.startswith('/org/freedesktop/NetworkManager/'):
classname = val.split('/')[4]
classname = {
'Settings': 'Connection',
'Devices': 'Device',
}.get(classname, classname)
return globals()[classname](val)
if isinstance(val, (dbus.Signature, dbus.String)):
return unicode(val)
if isinstance(val, dbus.Boolean):
return bool(val)
if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
return int(val)
if isinstance(val, dbus.Byte):
return bytes([int(val)])
return val
python类Array()的实例源码
def get_properties(self):
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(
self.manufacturer_data, signature='qv')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data,
signature='sv')
if self.include_tx_power is not None:
properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)
return {LE_ADVERTISEMENT_IFACE: properties}
def dict_to_string(self, d):
# Try to trivially translate a dictionary's elements into nice string
# formatting.
dstr=""
for key in d:
val=d[key]
str_val=""
add_string=True
if type(val)==type(dbus.Array([])):
for elt in val:
if type(elt)==type(dbus.Byte(1)):
str_val+="%s " % int(elt)
elif type(elt)==type(dbus.String("")):
str_val+="%s" % elt
elif type(val)==type(dbus.Dictionary({})):
dstr+=self.dict_to_string(val)
add_string=False
else:
str_val=val
if add_string:
dstr+="%s: %s\n" % ( key, str_val)
return dstr
def Set(self, interface_name, property_name, value, *args, **kwargs):
"""Standard D-Bus API for setting a property value"""
try:
iface_props = self.props[interface_name]
except KeyError:
raise dbus.exceptions.DBusException(
'no such interface ' + interface_name,
name=self.interface + '.UnknownInterface')
if property_name not in iface_props:
raise dbus.exceptions.DBusException(
'no such property ' + property_name,
name=self.interface + '.UnknownProperty')
iface_props[property_name] = value
self.PropertiesChanged(interface_name,
dbus.Dictionary({property_name: value},
signature='sv'),
dbus.Array([], signature='s'))
def get_properties(self):
"""Return a dictionary of the service properties.
The dictionary has the following keys:
- UUID: the service UUID
- Primary: whether the service is the primary service
- Characteristics: D-Bus array of the characteristic object paths
associated with the service.
"""
return {
constants.GATT_SERVICE_IFACE: {
'UUID': self.uuid,
'Primary': self.primary,
'Characteristics': dbus.Array(
self.get_characteristic_paths(),
signature='o')
}
}
def get_properties(self):
"""Return a dictionary of the characteristic properties.
The dictionary has the following keys:
- Service: the characteristic's service
- UUID: the characteristic UUID
- Flags: any characteristic flags
- Descriptors: D-Bus array of the descriptor object paths
associated with the characteristic.
"""
return {
constants.GATT_CHRC_IFACE: {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
'Descriptors': dbus.Array(
self.get_descriptor_paths(),
signature='o')
}
}
def read_raw_value(self, flags=''):
"""
Return this characteristic's value (if allowed).
:param flags: "offset": Start offset
"device": Device path (Server only)
:return:
Possible Errors: org.bluez.Error.Failed
org.bluez.Error.InProgress
org.bluez.Error.NotPermitted
org.bluez.Error.InvalidValueLength
org.bluez.Error.NotAuthorized
org.bluez.Error.NotSupported
"""
return self.characteristic_methods.ReadValue(dbus.Array())
def convert(dbus_obj):
"""Converts dbus_obj from dbus type to python type.
:param dbus_obj: dbus object.
:returns: dbus_obj in python type.
"""
_isinstance = partial(isinstance, dbus_obj)
ConvertType = namedtuple('ConvertType', 'pytype dbustypes')
pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
dbus.UInt16, dbus.UInt32, dbus.UInt64))
pybool = ConvertType(bool, (dbus.Boolean, ))
pyfloat = ConvertType(float, (dbus.Double, ))
pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)),
(dbus.Array, ))
pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)),
(dbus.Struct, ))
types_str = (dbus.ObjectPath, dbus.Signature, dbus.String)
pystr = ConvertType(str, types_str)
pydict = ConvertType(
lambda _obj: dict(list(zip(list(map(convert, dbus_obj.keys())),
list(map(convert, dbus_obj.values()))
))
),
(dbus.Dictionary, )
)
for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict):
if any(map(_isinstance, conv.dbustypes)):
return conv.pytype(dbus_obj)
else:
return dbus_obj
def add_manufacturer_data(self, manuf_code, data):
if not self.manufacturer_data:
self.manufacturer_data = dbus.Dictionary({}, signature='qv')
self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')
def add_service_data(self, uuid, data):
if not self.service_data:
self.service_data = dbus.Dictionary({}, signature='sv')
self.service_data[uuid] = dbus.Array(data, signature='y')
def get_properties(self):
return {
GATT_SERVICE_IFACE: {
'UUID': self.uuid,
'Primary': self.primary,
'Characteristics': dbus.Array(
self.get_characteristic_paths(),
signature='o')
}
}
def get_properties(self):
return {
GATT_CHRC_IFACE: {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
'Descriptors': dbus.Array(
self.get_descriptor_paths(),
signature='o')
}
}
def convert(dbus_obj):
"""Converts dbus_obj from dbus type to python type.
:param dbus_obj: dbus object.
:returns: dbus_obj in python type.
"""
_isinstance = partial(isinstance, dbus_obj)
ConvertType = namedtuple('ConvertType', 'pytype dbustypes')
pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
dbus.UInt16, dbus.UInt32, dbus.UInt64))
pybool = ConvertType(bool, (dbus.Boolean, ))
pyfloat = ConvertType(float, (dbus.Double, ))
pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)),
(dbus.Array, ))
pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)),
(dbus.Struct, ))
types_str = (dbus.ObjectPath, dbus.Signature, dbus.String)
pystr = ConvertType(str, types_str)
pydict = ConvertType(
lambda _obj: dict(zip(map(convert, dbus_obj.keys()),
map(convert, dbus_obj.values())
)
),
(dbus.Dictionary, )
)
for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict):
if any(map(_isinstance, conv.dbustypes)):
return conv.pytype(dbus_obj)
else:
return dbus_obj
def base_to_python(val):
if isinstance(val, dbus.ByteArray):
return "".join([str(x) for x in val])
if isinstance(val, (dbus.Array, list, tuple)):
return [fixups.base_to_python(x) for x in val]
if isinstance(val, (dbus.Dictionary, dict)):
return dict([(fixups.base_to_python(x), fixups.base_to_python(y)) for x, y in val.items()])
if isinstance(val, dbus.ObjectPath):
for obj in (NetworkManager, Settings, AgentManager):
if val == obj.object_path:
return obj
if val.startswith('/org/freedesktop/NetworkManager/'):
classname = val.split('/')[4]
classname = {
'Settings': 'Connection',
'Devices': 'Device',
}.get(classname, classname)
try:
return globals()[classname](val)
except ObjectVanished:
return None
if val == '/':
return None
if isinstance(val, (dbus.Signature, dbus.String)):
return six.text_type(val)
if isinstance(val, dbus.Boolean):
return bool(val)
if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
return int(val)
if isinstance(val, dbus.Byte):
return six.int2byte(int(val))
return val
def test_dbus_wrap_array(self):
value = [1]
dbus_value = dbus_mqtt.wrap_dbus_value(value)
self.assertIsInstance(dbus_value, dbus.Array)
self.assertEqual(dbus.Array([dbus.Int32(1, variant_level=1)]), dbus_value)
def test_dbus_unwrap_array(self):
dbus_value = dbus.Array([dbus.Int32(3, variant_level=1), dbus.Int32(7, variant_level=1)],
variant_level=1)
value = dbus_mqtt.unwrap_dbus_value(dbus_value)
self.assertIsInstance(value, list)
self.assertEqual([3, 7], value)
def test_dbus_unwrap_empty_array(self):
dbus_value = dbus.Array([], variant_level=1)
value = dbus_mqtt.unwrap_dbus_value(dbus_value)
self.assertIsNone(value)
def set_WFDIEs(self, parameter):
self.__set_property(dbus.String("WFDIEs"), dbus.Array(parameter, "y"))
def temperature_cb(self):
reading = [get_cpu_temperature()]
print('Getting new temperature',
reading,
self.props[constants.GATT_CHRC_IFACE]['Notifying'])
self.props[constants.GATT_CHRC_IFACE]['Value'] = reading
self.PropertiesChanged(constants.GATT_CHRC_IFACE,
{'Value': dbus.Array(cpu_temp_sint16(reading))},
[])
print('Array value: ', cpu_temp_sint16(reading))
return self.props[constants.GATT_CHRC_IFACE]['Notifying']
def ReadValue(self, options):
reading = [get_cpu_temperature()]
self.props[constants.GATT_CHRC_IFACE]['Value'] = reading
return dbus.Array(
cpu_temp_sint16(self.props[constants.GATT_CHRC_IFACE]['Value'])
)
def __init__(self):
self.bus = dbus.SystemBus()
self.app = localGATT.Application()
self.srv = localGATT.Service(1, CPU_TMP_SRVC, True)
self.charc = TemperatureChrc(self.srv)
self.charc.service = self.srv.path
cpu_format_value = dbus.Array([dbus.Byte(0x0E),
dbus.Byte(0xFE),
dbus.Byte(0x2F),
dbus.Byte(0x27),
dbus.Byte(0x01),
dbus.Byte(0x00),
dbus.Byte(0x00)])
self.cpu_format = localGATT.Descriptor(4,
CPU_FMT_DSCP,
self.charc,
cpu_format_value,
['read'])
self.app.add_managed_object(self.srv)
self.app.add_managed_object(self.charc)
self.app.add_managed_object(self.cpu_format)
self.srv_mng = GATT.GattManager(adapter.list_adapters()[0])
self.srv_mng.register_application(self.app, {})
self.dongle = adapter.Adapter(adapter.list_adapters()[0])
advert = advertisement.Advertisement(1, 'peripheral')
advert.service_UUIDs = [CPU_TMP_SRVC]
# eddystone_data = tools.url_to_advert(WEB_BLINKT, 0x10, TX_POWER)
# advert.service_data = {EDDYSTONE: eddystone_data}
if not self.dongle.powered:
self.dongle.powered = True
ad_manager = advertisement.AdvertisingManager(self.dongle.address)
ad_manager.register_advertisement(advert, {})
def Set(self, interface_name, property_name, value, *args, **kwargs):
"""Standard D-Bus API for setting a property value"""
if property_name not in self.props[constants.GATT_CHRC_IFACE]:
raise dbus.exceptions.DBusException(
'no such property ' + property_name,
name=constants.GATT_CHRC_IFACE + '.UnknownProperty')
self.props[constants.GATT_CHRC_IFACE][property_name] = value
return self.PropertiesChanged(interface_name,
dbus.Dictionary({property_name: value},
signature='sv'),
dbus.Array([], signature='s'))
def get_properties(self):
"""Return a dictionary of the advert properties.
The dictionary has the following keys:
- Type: the advertisement type.
- ServiceUUIDS: UUIDs of services to advertise
- SolicitUUIDS:
- ManufacturerData: dictionary of manufacturer data
- ServiceData: dictionary of service data
- IncludeTxPower:
"""
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(
self.manufacturer_data, signature='qay')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data,
signature='say')
if self.include_tx_power is not None:
properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)
return {constants.LE_ADVERTISEMENT_IFACE: properties}
def service_data(self, data):
for UUID in data:
self.Set(constants.LE_ADVERTISEMENT_IFACE,
'ServiceData',
{UUID: dbus.Array(data[UUID], signature='y')})
def GetAll(self, interface_name):
"""Return the advertisement properties.
This method is registered with the D-Bus at
``org.freedesktop.DBus.Properties``
:param interface: interface to get the properties of.
The interface must be ``org.bluez.LEAdvertisement1`` otherwise an
exception is raised.
"""
if interface_name != constants.LE_ADVERTISEMENT_IFACE:
raise InvalidArgsException()
response = {}
response['Type'] = self.props[interface_name]['Type']
if self.props[interface_name]['ServiceUUIDs'] is not None:
response['ServiceUUIDs'] = dbus.Array(
self.props[interface_name]['ServiceUUIDs'],
signature='s')
if self.props[interface_name]['ServiceData'] is not None:
response['ServiceData'] = dbus.Dictionary(
self.props[interface_name]['ServiceData'],
signature='sv')
if self.props[interface_name]['ManufacturerData'] is not None:
response['ManufacturerData'] = dbus.Dictionary(
self.props[interface_name]['ManufacturerData'],
signature='qv')
if self.props[interface_name]['SolicitUUIDs'] is not None:
response['SolicitUUIDs'] = dbus.Array(
self.props[interface_name]['SolicitUUIDs'],
signature='s')
response['IncludeTxPower'] = dbus.Boolean(
self.props[interface_name]['IncludeTxPower'])
return response
def write_value(self, value, flags=''):
"""
Write a new value to the characteristic.
:param value:
:param flags:
:return:
"""
self.characteristic_methods.WriteValue(value, dbus.Array(flags))
def read_raw_value(self, flags=''):
"""
Issue a request to read the value of the descriptor.
Returns the value if the operation was successful.
:param flags: "offset": Start offset
"device": Device path (Server only)
:return: dbus byte array
"""
return self.descriptor_methods.ReadValue(dbus.Array(flags))
def write_value(self, value, flags=''):
"""
Issue a request to write the value of the descriptor.
:param value: DBus byte array
:param flags: "offset": Start offset
"device": Device path (Server only)
:return:
"""
self.descriptor_methods.WriteValue(value, dbus.Array(flags))
def _get_root_iface_properties(self):
return {
'CanQuit': (True, None),
'Fullscreen': (False, None),
'CanSetFullscreen': (False, None),
'CanRaise': (False, None),
# NOTE Change if adding optional track list support
'HasTrackList': (False, None),
'Identity': (IDENTITY, None),
'DesktopEntry': (DESKTOP, None),
'SupportedUriSchemes': (dbus.Array([], 's', 1), None),
# NOTE Return MIME types supported by local backend if support for
# reporting supported MIME types is added
'SupportedMimeTypes': (dbus.Array([], 's', 1), None),
}
def get_properties(self):
return {
GATT_SERVICE_IFACE: {
'UUID': self.uuid,
'Primary': self.primary,
'Characteristics': dbus.Array(
self.get_characteristic_paths(),
signature='o')
}
}