def unwrap(self, val):
if isinstance(val, dbus.ByteArray):
return "".join([str(x) for x in val])
if isinstance(val, (dbus.Array, list, tuple)):
return [self.unwrap(x) for x in val]
if isinstance(val, (dbus.Dictionary, dict)):
return dict([(self.unwrap(x), self.unwrap(y)) for x, y in val.items()])
if isinstance(val, dbus.ObjectPath):
if val.startswith('/org/freedesktop/NetworkManager/'):
classname = val.split('/')[4]
classname = {
'Settings': 'Connection',
'Devices': 'Device',
}.get(classname, classname)
return globals()[classname](val)
if isinstance(val, (dbus.Signature, dbus.String)):
return unicode(val)
if isinstance(val, dbus.Boolean):
return bool(val)
if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
return int(val)
if isinstance(val, dbus.Byte):
return bytes([int(val)])
return val
python类Byte()的实例源码
def hr_msrmt_cb(self):
value = []
value.append(dbus.Byte(0x06))
value.append(dbus.Byte(randint(90, 130)))
if self.hr_ee_count % 10 == 0:
value[0] = dbus.Byte(value[0] | 0x08)
value.append(dbus.Byte(self.service.energy_expended & 0xff))
value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))
self.service.energy_expended = \
min(0xffff, self.service.energy_expended + 1)
self.hr_ee_count += 1
print('Updating value: ' + repr(value))
self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])
return self.notifying
def dict_to_string(self, d):
# Try to trivially translate a dictionary's elements into nice string
# formatting.
dstr=""
for key in d:
val=d[key]
str_val=""
add_string=True
if type(val)==type(dbus.Array([])):
for elt in val:
if type(elt)==type(dbus.Byte(1)):
str_val+="%s " % int(elt)
elif type(elt)==type(dbus.String("")):
str_val+="%s" % elt
elif type(val)==type(dbus.Dictionary({})):
dstr+=self.dict_to_string(val)
add_string=False
else:
str_val=val
if add_string:
dstr+="%s: %s\n" % ( key, str_val)
return dstr
def write_value(self, value, offset=0):
"""
Attempts to write a value to the characteristic.
Success or failure will be notified by calls to `write_value_succeeded` or `write_value_failed` respectively.
:param value: array of bytes to be written
:param offset: offset from where to start writing the bytes (defaults to 0)
"""
bytes = [dbus.Byte(b) for b in value]
try:
self._object.WriteValue(
bytes,
{'offset': dbus.UInt16(offset, variant_level=1)},
reply_handler=self._write_value_succeeded,
error_handler=self._write_value_failed,
dbus_interface='org.bluez.GattCharacteristic1')
except dbus.exceptions.DBusException as e:
self._write_value_failed(self, error=e)
def send_notify_event(self, value):
"""Send a notification event.
:param value: the value that the characteristic is to be set to.
This function sets the characteristic value, and if the characteristic
is set to notify emits a PropertiesChanged() signal with the new value.
"""
# print('send', self, value)
self.value = value
if not self.notifying:
print('Not notifying')
return
# print('Update prop')
self.PropertiesChanged(
constants.GATT_CHRC_IFACE,
{'Value': [dbus.Byte(self.value)]}, [])
####################
# Descriptor Classes
####################
def convert(dbus_obj):
"""Converts dbus_obj from dbus type to python type.
:param dbus_obj: dbus object.
:returns: dbus_obj in python type.
"""
_isinstance = partial(isinstance, dbus_obj)
ConvertType = namedtuple('ConvertType', 'pytype dbustypes')
pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
dbus.UInt16, dbus.UInt32, dbus.UInt64))
pybool = ConvertType(bool, (dbus.Boolean, ))
pyfloat = ConvertType(float, (dbus.Double, ))
pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)),
(dbus.Array, ))
pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)),
(dbus.Struct, ))
types_str = (dbus.ObjectPath, dbus.Signature, dbus.String)
pystr = ConvertType(str, types_str)
pydict = ConvertType(
lambda _obj: dict(list(zip(list(map(convert, dbus_obj.keys())),
list(map(convert, dbus_obj.values()))
))
),
(dbus.Dictionary, )
)
for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict):
if any(map(_isinstance, conv.dbustypes)):
return conv.pytype(dbus_obj)
else:
return dbus_obj
def ssid_to_dbus(ssid):
if isinstance(ssid, unicode):
ssid = ssid.encode('utf-8')
return [dbus.Byte(x) for x in ssid]
def mac_to_dbus(mac):
return [dbus.Byte(int(x, 16)) for x in mac.split(':')]
def set_hint_byte(self, key, value):
"""Set a hint with a dbus byte value. The input value can be an
integer or a bytes string of length 1.
"""
self.hints[key] = dbus.Byte(value)
def notify_battery_level(self):
if not self.notifying:
return
self.PropertiesChanged(
GATT_CHRC_IFACE,
{'Value': [dbus.Byte(self.battery_lvl)] }, [])
def ReadValue(self, options):
print('Battery level read: ' + repr(self.battery_lvl))
return [dbus.Byte(self.battery_lvl)]
def ReadValue(self, options):
return [
dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
]
def ReadValue(self, options):
return [
dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
]
def convert(dbus_obj):
"""Converts dbus_obj from dbus type to python type.
:param dbus_obj: dbus object.
:returns: dbus_obj in python type.
"""
_isinstance = partial(isinstance, dbus_obj)
ConvertType = namedtuple('ConvertType', 'pytype dbustypes')
pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
dbus.UInt16, dbus.UInt32, dbus.UInt64))
pybool = ConvertType(bool, (dbus.Boolean, ))
pyfloat = ConvertType(float, (dbus.Double, ))
pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)),
(dbus.Array, ))
pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)),
(dbus.Struct, ))
types_str = (dbus.ObjectPath, dbus.Signature, dbus.String)
pystr = ConvertType(str, types_str)
pydict = ConvertType(
lambda _obj: dict(zip(map(convert, dbus_obj.keys()),
map(convert, dbus_obj.values())
)
),
(dbus.Dictionary, )
)
for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict):
if any(map(_isinstance, conv.dbustypes)):
return conv.pytype(dbus_obj)
else:
return dbus_obj
def base_to_python(val):
if isinstance(val, dbus.ByteArray):
return "".join([str(x) for x in val])
if isinstance(val, (dbus.Array, list, tuple)):
return [fixups.base_to_python(x) for x in val]
if isinstance(val, (dbus.Dictionary, dict)):
return dict([(fixups.base_to_python(x), fixups.base_to_python(y)) for x, y in val.items()])
if isinstance(val, dbus.ObjectPath):
for obj in (NetworkManager, Settings, AgentManager):
if val == obj.object_path:
return obj
if val.startswith('/org/freedesktop/NetworkManager/'):
classname = val.split('/')[4]
classname = {
'Settings': 'Connection',
'Devices': 'Device',
}.get(classname, classname)
try:
return globals()[classname](val)
except ObjectVanished:
return None
if val == '/':
return None
if isinstance(val, (dbus.Signature, dbus.String)):
return six.text_type(val)
if isinstance(val, dbus.Boolean):
return bool(val)
if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
return int(val)
if isinstance(val, dbus.Byte):
return six.int2byte(int(val))
return val
def ssid_to_dbus(ssid):
if isinstance(ssid, six.text_type):
ssid = ssid.encode('utf-8')
return [dbus.Byte(x) for x in ssid]
def mac_to_dbus(mac):
return [dbus.Byte(int(x, 16)) for x in mac.split(':')]
def cert_to_dbus(cert):
if not isinstance(cert, bytes):
if not cert.startswith('file://'):
cert = 'file://' + cert
cert = cert.encode('utf-8') + b'\0'
return [dbus.Byte(x) for x in cert]
# Turn NetworkManager and Settings into singleton objects
def test_dbus_unwrap_byte(self):
dbus_value = dbus.Byte(245, variant_level=1)
value = dbus_mqtt.unwrap_dbus_value(dbus_value)
self.assertIsInstance(value, int)
self.assertEqual(int(dbus_value), value)
def cpu_temp_sint16(value):
answer = []
value_int16 = sint16(value[0])
for bytes in value_int16:
answer.append(dbus.Byte(bytes))
return answer
def __init__(self):
self.bus = dbus.SystemBus()
self.app = localGATT.Application()
self.srv = localGATT.Service(1, CPU_TMP_SRVC, True)
self.charc = TemperatureChrc(self.srv)
self.charc.service = self.srv.path
cpu_format_value = dbus.Array([dbus.Byte(0x0E),
dbus.Byte(0xFE),
dbus.Byte(0x2F),
dbus.Byte(0x27),
dbus.Byte(0x01),
dbus.Byte(0x00),
dbus.Byte(0x00)])
self.cpu_format = localGATT.Descriptor(4,
CPU_FMT_DSCP,
self.charc,
cpu_format_value,
['read'])
self.app.add_managed_object(self.srv)
self.app.add_managed_object(self.charc)
self.app.add_managed_object(self.cpu_format)
self.srv_mng = GATT.GattManager(adapter.list_adapters()[0])
self.srv_mng.register_application(self.app, {})
self.dongle = adapter.Adapter(adapter.list_adapters()[0])
advert = advertisement.Advertisement(1, 'peripheral')
advert.service_UUIDs = [CPU_TMP_SRVC]
# eddystone_data = tools.url_to_advert(WEB_BLINKT, 0x10, TX_POWER)
# advert.service_data = {EDDYSTONE: eddystone_data}
if not self.dongle.powered:
self.dongle.powered = True
ad_manager = advertisement.AdvertisingManager(self.dongle.address)
ad_manager.register_advertisement(advert, {})
def ReadValue(self, options):
"""Return the characteristic value.
This method is registered with the D-Bus at
``org.bluez.GattCharacteristic1``.
"""
# print('Reading Characteristic', self.value)
if self.value is None:
self.value = 0
return [dbus.Byte(self.value)]
def set_connection_state(self, state, current_ssid):
self.state = state
self.current_ssid = current_ssid
if self.is_notifying:
logger.info("Sending updated connection state")
if self.state != WifiConnectionState.DISCONNECTED and self.current_ssid:
self.value_update([dbus.Byte(self.state.value)] + string_to_dbus_array(self.current_ssid))
else:
self.value_update([dbus.Byte(self.state.value)])
def _read_value(self, options):
logger.info("Read Connection State Value")
if self.state != WifiConnectionState.DISCONNECTED and self.current_ssid:
return [dbus.Byte(self.state.value)] + string_to_dbus_array(self.current_ssid)
else:
return [dbus.Byte(self.state.value)]
def string_to_dbus_array(value):
return [dbus.Byte(c) for c in value.encode()]