def unwrap(self, val):
if isinstance(val, dbus.ByteArray):
return "".join([str(x) for x in val])
if isinstance(val, (dbus.Array, list, tuple)):
return [self.unwrap(x) for x in val]
if isinstance(val, (dbus.Dictionary, dict)):
return dict([(self.unwrap(x), self.unwrap(y)) for x, y in val.items()])
if isinstance(val, dbus.ObjectPath):
if val.startswith('/org/freedesktop/NetworkManager/'):
classname = val.split('/')[4]
classname = {
'Settings': 'Connection',
'Devices': 'Device',
}.get(classname, classname)
return globals()[classname](val)
if isinstance(val, (dbus.Signature, dbus.String)):
return unicode(val)
if isinstance(val, dbus.Boolean):
return bool(val)
if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
return int(val)
if isinstance(val, dbus.Byte):
return bytes([int(val)])
return val
python类Boolean()的实例源码
def get_properties(self):
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(
self.manufacturer_data, signature='qv')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data,
signature='sv')
if self.include_tx_power is not None:
properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)
return {LE_ADVERTISEMENT_IFACE: properties}
def advertising_main(mainloop, bus, adapter_name):
adapter = adapters.find_adapter(bus, LE_ADVERTISING_MANAGER_IFACE, adapter_name)
print('adapter: %s' % (adapter,))
if not adapter:
raise Exception('LEAdvertisingManager1 interface not found')
adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
"org.freedesktop.DBus.Properties")
adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)
test_advertisement = TestAdvertisement(bus, 0)
ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
reply_handler=register_ad_cb,
error_handler=functools.partial(register_ad_error_cb, mainloop))
def trust(self, address):
try:
device = bluezutils.find_device(address)
except (bluezutils.BluezUtilError,
dbus.exceptions.DBusException) as error:
print_error(str(error) + "\n")
return False
try:
props = dbus.Interface(
self._bus.get_object("org.bluez", device.object_path),
"org.freedesktop.DBus.Properties")
if not props.Get("org.bluez.Device1", "Trusted"):
props.Set("org.bluez.Device1", "Trusted", dbus.Boolean(1))
except dbus.exceptions.DBusException as error:
print_error(str(error) + "\n")
return False
return True
def convert(dbus_obj):
"""Converts dbus_obj from dbus type to python type.
:param dbus_obj: dbus object.
:returns: dbus_obj in python type.
"""
_isinstance = partial(isinstance, dbus_obj)
ConvertType = namedtuple('ConvertType', 'pytype dbustypes')
pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
dbus.UInt16, dbus.UInt32, dbus.UInt64))
pybool = ConvertType(bool, (dbus.Boolean, ))
pyfloat = ConvertType(float, (dbus.Double, ))
pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)),
(dbus.Array, ))
pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)),
(dbus.Struct, ))
types_str = (dbus.ObjectPath, dbus.Signature, dbus.String)
pystr = ConvertType(str, types_str)
pydict = ConvertType(
lambda _obj: dict(list(zip(list(map(convert, dbus_obj.keys())),
list(map(convert, dbus_obj.values()))
))
),
(dbus.Dictionary, )
)
for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict):
if any(map(_isinstance, conv.dbustypes)):
return conv.pytype(dbus_obj)
else:
return dbus_obj
def convert(dbus_obj):
"""Converts dbus_obj from dbus type to python type.
:param dbus_obj: dbus object.
:returns: dbus_obj in python type.
"""
_isinstance = partial(isinstance, dbus_obj)
ConvertType = namedtuple('ConvertType', 'pytype dbustypes')
pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
dbus.UInt16, dbus.UInt32, dbus.UInt64))
pybool = ConvertType(bool, (dbus.Boolean, ))
pyfloat = ConvertType(float, (dbus.Double, ))
pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)),
(dbus.Array, ))
pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)),
(dbus.Struct, ))
types_str = (dbus.ObjectPath, dbus.Signature, dbus.String)
pystr = ConvertType(str, types_str)
pydict = ConvertType(
lambda _obj: dict(zip(map(convert, dbus_obj.keys()),
map(convert, dbus_obj.values())
)
),
(dbus.Dictionary, )
)
for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict):
if any(map(_isinstance, conv.dbustypes)):
return conv.pytype(dbus_obj)
else:
return dbus_obj
def base_to_python(val):
if isinstance(val, dbus.ByteArray):
return "".join([str(x) for x in val])
if isinstance(val, (dbus.Array, list, tuple)):
return [fixups.base_to_python(x) for x in val]
if isinstance(val, (dbus.Dictionary, dict)):
return dict([(fixups.base_to_python(x), fixups.base_to_python(y)) for x, y in val.items()])
if isinstance(val, dbus.ObjectPath):
for obj in (NetworkManager, Settings, AgentManager):
if val == obj.object_path:
return obj
if val.startswith('/org/freedesktop/NetworkManager/'):
classname = val.split('/')[4]
classname = {
'Settings': 'Connection',
'Devices': 'Device',
}.get(classname, classname)
try:
return globals()[classname](val)
except ObjectVanished:
return None
if val == '/':
return None
if isinstance(val, (dbus.Signature, dbus.String)):
return six.text_type(val)
if isinstance(val, dbus.Boolean):
return bool(val)
if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
return int(val)
if isinstance(val, dbus.Byte):
return six.int2byte(int(val))
return val
def is_adapter_powered(self, powered):
return self._adapter_properties.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(powered))
def set_debug_level(self, parameter):
self.__set_property(dbus.String("DebugTimestamp"), dbus.Boolean(parameter))
def set_debug_show_keys(self, parameter):
self.__set_property(dbus.String("DebugShowKeys"), dbus.Boolean(parameter))
def StartNotify(self):
"""
DBus method for enabling notifications of the characteristic value.
:return: value
"""
if not self.props[constants.GATT_CHRC_IFACE]['Notifying'] is True:
logger.info('Notifying already, nothing to do')
return
self.Set(constants.GATT_CHRC_IFACE,
'Notifying',
dbus.Boolean(True, variant_level=1))
def StopNotify(self):
"""
DBus method for disabling notifications of the characteristic value.
:return: value
"""
if self.props[constants.GATT_CHRC_IFACE]['Notifying'] is False:
logger.info('Not Notifying, nothing to do')
return
self.Set(constants.GATT_CHRC_IFACE,
'Notifying',
dbus.Boolean(False, variant_level=1))
def get_properties(self):
"""Return a dictionary of the advert properties.
The dictionary has the following keys:
- Type: the advertisement type.
- ServiceUUIDS: UUIDs of services to advertise
- SolicitUUIDS:
- ManufacturerData: dictionary of manufacturer data
- ServiceData: dictionary of service data
- IncludeTxPower:
"""
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(
self.manufacturer_data, signature='qay')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data,
signature='say')
if self.include_tx_power is not None:
properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)
return {constants.LE_ADVERTISEMENT_IFACE: properties}
def GetAll(self, interface_name):
"""Return the advertisement properties.
This method is registered with the D-Bus at
``org.freedesktop.DBus.Properties``
:param interface: interface to get the properties of.
The interface must be ``org.bluez.LEAdvertisement1`` otherwise an
exception is raised.
"""
if interface_name != constants.LE_ADVERTISEMENT_IFACE:
raise InvalidArgsException()
response = {}
response['Type'] = self.props[interface_name]['Type']
if self.props[interface_name]['ServiceUUIDs'] is not None:
response['ServiceUUIDs'] = dbus.Array(
self.props[interface_name]['ServiceUUIDs'],
signature='s')
if self.props[interface_name]['ServiceData'] is not None:
response['ServiceData'] = dbus.Dictionary(
self.props[interface_name]['ServiceData'],
signature='sv')
if self.props[interface_name]['ManufacturerData'] is not None:
response['ManufacturerData'] = dbus.Dictionary(
self.props[interface_name]['ManufacturerData'],
signature='qv')
if self.props[interface_name]['SolicitUUIDs'] is not None:
response['SolicitUUIDs'] = dbus.Array(
self.props[interface_name]['SolicitUUIDs'],
signature='s')
response['IncludeTxPower'] = dbus.Boolean(
self.props[interface_name]['IncludeTxPower'])
return response
def run(self):
_bluetooth.make_discoverable(True, self.timeout)
_bluetooth.set_adapter_property("Pairable", dbus.Boolean(1))
_bluetooth.set_adapter_property("PairableTimeout", dbus.UInt32(0))
self.agent = Agent(self.client_class, self.timeout, self.path)
if not self._register():
self.shutdown()
return
self._mainloop.run()
def make_discoverable(self, value=True, timeout=180):
try:
adapter = bluezutils.find_adapter()
except (bluezutils.BluezUtilError,
dbus.exceptions.DBusException) as error:
print_error(str(error) + "\n")
return False
try:
props = dbus.Interface(
self._bus.get_object("org.bluez", adapter.object_path),
"org.freedesktop.DBus.Properties")
timeout = int(timeout)
value = int(value)
if int(props.Get(
"org.bluez.Adapter1", "DiscoverableTimeout")) != timeout:
props.Set(
"org.bluez.Adapter1", "DiscoverableTimeout",
dbus.UInt32(timeout))
if int(props.Get("org.bluez.Adapter1", "Discoverable")) != value:
props.Set(
"org.bluez.Adapter1", "Discoverable", dbus.Boolean(value))
except dbus.exceptions.DBusException as error:
print_error(str(error) + "\n")
return False
return True
def do_pair():
global pairing
global mainloop
bus = dbus.SystemBus()
#remove connected and paired device from bluez
remove_paired_bt_device()
# we are using our own agent to bypass authorization and get callback for connected state
path = "/test/agent"
agent = my_bt_agent(bus, path)
obj = bus.get_object('org.bluez', "/org/bluez");
manager = dbus.Interface(obj, "org.bluez.AgentManager1")
manager.RegisterAgent(path, 'NoInputNoOutput')
manager.RequestDefaultAgent(path)
adapter1_path = "/org/bluez/hci0"
adapter1 = dbus.Interface(bus.get_object("org.bluez", adapter1_path), "org.freedesktop.DBus.Properties")
adapter1.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
adapter1.Set("org.bluez.Adapter1", "Pairable", dbus.Boolean(1))
adapter1.Set("org.bluez.Adapter1", "Discoverable", dbus.Boolean(1))
# let's wait for paired callback from bluez or timeout from led blink
mainloop.run()
pairing = False
manager.UnregisterAgent(path)
agent.remove_from_connection()
def device_discoverable(device_name, discoverable):
bus = dbus.SystemBus()
adapter_path = find_adapter(device_name).object_path
adapter = dbus.Interface(bus.get_object(SERVICE_NAME, adapter_path),"org.freedesktop.DBus.Properties")
if discoverable:
value = dbus.Boolean(1)
else:
value = dbus.Boolean(0)
adapter.Set(ADAPTER_INTERFACE, "Discoverable", value)
def device_pairable(device_name, pairable):
bus = dbus.SystemBus()
adapter_path = find_adapter(device_name).object_path
adapter = dbus.Interface(bus.get_object(SERVICE_NAME, adapter_path),"org.freedesktop.DBus.Properties")
if pairable:
value = dbus.Boolean(1)
else:
value = dbus.Boolean(0)
adapter.Set(ADAPTER_INTERFACE, "Pairable", value)
def device_powered(device_name, powered):
bus = dbus.SystemBus()
adapter_path = find_adapter(device_name).object_path
adapter = dbus.Interface(bus.get_object(SERVICE_NAME, adapter_path),"org.freedesktop.DBus.Properties")
if powered:
value = dbus.Boolean(1)
else:
value = dbus.Boolean(0)
adapter.Set(ADAPTER_INTERFACE, "Powered", value)
def is_discoverable(self, value):
self._adapter_props.Set('org.bluez.Adapter1', 'Discoverable', dbus.Boolean(value))
def is_powered(self, value):
self._adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(value))
def get_properties(self):
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(self.manufacturer_data, signature='qv')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data, signature='sv')
if self.include_tx_power is not None:
properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)
return {LE_ADVERTISEMENT_IFACE: properties}
def voice_dial(self):
if not self.has_feature('VoiceRecognition'):
return self.__failure('Voice recognition is not supported by phone')
self.__ag_interface.SetVoiceRecognition(dbus.Boolean(True))
def __enable_reconnect(self):
self.set_property('AutoReconnect', dbus.Boolean(True))
def __init__(self, onBluetoothConnected_callback, onPlayerChanged_callback):
self.onBluetoothConnected_callback = onBluetoothConnected_callback
self.onPlayerChanged_callback = onPlayerChanged_callback
# Get the system bus
try:
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.bus = dbus.SystemBus()
except Exception as ex:
print("Unable to get the system dbus: '{0}'. Exiting. Is dbus running?".format(ex.message))
return False
adapter_path = bluezutils.find_adapter(None).object_path
adapter = dbus.Interface(self.bus.get_object(bluezutils.SERVICE_NAME, adapter_path), "org.freedesktop.DBus.Properties")
# Power on adapter
adapter.Set(bluezutils.ADAPTER_INTERFACE, "Powered", dbus.Boolean(1))
# Set name to display in available connections
adapter.Set(bluezutils.ADAPTER_INTERFACE, "Alias", bluezutils.BT_DEVICE_NAME)
# Set pairable on
adapter.Set(bluezutils.ADAPTER_INTERFACE, "Pairable", dbus.Boolean(1))
# Set discoverable on
adapter.Set(bluezutils.ADAPTER_INTERFACE, "Discoverable", dbus.Boolean(1))
# Set discoverable timeout to 0
adapter.Set(bluezutils.ADAPTER_INTERFACE, "DiscoverableTimeout", dbus.UInt32(0))
# Set paraible time out to 0
adapter.Set(bluezutils.ADAPTER_INTERFACE, "PairableTimeout", dbus.UInt32(0))
bluezutils.show_adapter_info()
self.path = "/test/agent"
agent = bluezutils.Agent(self.bus, self.path)
obj = self.bus.get_object(bluezutils.SERVICE_NAME, "/org/bluez");
self.manager = dbus.Interface(obj, "org.bluez.AgentManager1")
self.manager.RegisterAgent(self.path, "NoInputNoOutput")
print("Bluetooth AgentManager registered")
# listen for signal of remove adapter
# self.bus.add_signal_receiver(self.interfaces_removed, bus_name=bluezutils.SERVICE_NAME, dbus_interface="org.freedesktop.DBus.ObjectManager", signal_name="InterfacesRemoved")
# listen for signals on the Bluez bus
self.bus.add_signal_receiver(self.device_property_changed, bus_name=bluezutils.SERVICE_NAME, signal_name="PropertiesChanged", path_keyword="device_path", interface_keyword="interface")
# listen for signal of changing properties
self.bus.add_signal_receiver(self.player_changed, bus_name=bluezutils.SERVICE_NAME, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path_keyword="path")
print("Signals receiver registered")
self.manager.RequestDefaultAgent(self.path)