def get_props(adapter=None,
device=None,
service=None,
characteristic=None,
descriptor=None):
"""
Get properties for the specified object
:param adapter: Adapter Address
:param device: Device Address
:param service: GATT Service UUID
:param characteristic: GATT Characteristic UUID
:param descriptor: GATT Descriptor UUID
:return: Object of the DBus properties available
"""
path_obj = get_dbus_path(adapter,
device,
service,
characteristic,
descriptor)
return get_dbus_iface(dbus.PROPERTIES_IFACE, get_dbus_obj(path_obj))
python类PROPERTIES_IFACE的实例源码
def __init__(self, adapter_addr=None):
self.bus = dbus.SystemBus()
if adapter_addr is None:
adapters = adapter.list_adapters()
if len(adapters) > 0:
adapter_addr = adapters[0]
self.advert_mngr_path = dbus_tools.get_dbus_path(adapter=adapter_addr)
self.advert_mngr_obj = self.bus.get_object(
constants.BLUEZ_SERVICE_NAME,
self.advert_mngr_path)
self.advert_mngr_methods = dbus.Interface(
self.advert_mngr_obj,
constants.LE_ADVERTISING_MANAGER_IFACE)
self.advert_mngr_props = dbus.Interface(self.advert_mngr_obj,
dbus.PROPERTIES_IFACE)
def __init__(self, adapter_addr, device_addr, profile_uuid):
"""
Remote GATT Profile Initialisation.
:param profile_path: dbus path to the profile.
"""
self.profile_path = dbus_tools.get_profile_path(adapter_addr,
device_addr,
profile_uuid)
self.bus = dbus.SystemBus()
self.profile_object = self.bus.get_object(
constants.BLUEZ_SERVICE_NAME,
self.profile_path)
self.profile_methods = dbus.Interface(
self.profile_object,
constants.GATT_PROFILE_IFACE)
self.profile_props = dbus.Interface(self.profile_object,
dbus.PROPERTIES_IFACE)
def __init__(self, adapter_addr):
"""
GATT Manager Initialisation.
:param manager_path: dbus path to the GATT Manager.
"""
self.manager_path = dbus_tools.get_dbus_path(adapter_addr)
self.bus = dbus.SystemBus()
self.manager_obj = self.bus.get_object(
constants.BLUEZ_SERVICE_NAME,
self.manager_path)
self.manager_methods = dbus.Interface(
self.manager_obj,
constants.GATT_MANAGER_IFACE)
self.manager_props = dbus.Interface(self.manager_obj,
dbus.PROPERTIES_IFACE)
def _register(self):
logger.info('Registering GATT application...')
self._gatt_manager.RegisterApplication(
self._app.get_path(), {},
reply_handler=lambda: logger.info("GATT application registered"),
error_handler=self._register_application_failed)
self._device_properties_changed_signal = self.bus.add_signal_receiver(
self._device_properties_changed,
dbus_interface=dbus.PROPERTIES_IFACE,
signal_name='PropertiesChanged',
arg0='org.bluez.Device1',
path_keyword='path')
self._adapter_properties_changed_signal = self.bus.add_signal_receiver(
self._adapter_properties_changed,
dbus_interface=dbus.PROPERTIES_IFACE,
signal_name='PropertiesChanged',
arg0='org.bluez.Adapter1',
path_keyword='path')
def run(self):
"""
Starts the main loop that is necessary to receive Bluetooth events from the Bluetooth adapter.
This call blocks until you call `stop()` to stop the main loop.
"""
if self._main_loop:
return
self._interface_added_signal = self._bus.add_signal_receiver(
self._interfaces_added,
dbus_interface='org.freedesktop.DBus.ObjectManager',
signal_name='InterfacesAdded')
# TODO: Also listen to 'interfaces removed' events?
self._properties_changed_signal = self._bus.add_signal_receiver(
self._properties_changed,
dbus_interface=dbus.PROPERTIES_IFACE,
signal_name='PropertiesChanged',
arg0='org.bluez.Device1',
path_keyword='path')
def disconnect_signals():
for device in self._devices.values():
device.invalidate()
self._properties_changed_signal.remove()
self._interface_added_signal.remove()
self._main_loop = GObject.MainLoop()
try:
self._main_loop.run()
disconnect_signals()
except Exception:
disconnect_signals()
raise
def __get_properties(self):
try:
obj = self._bus.get_object(self._BASE_NAME, self._BASE_PATH)
properties_interface = dbus.Interface(obj, dbus.PROPERTIES_IFACE)
return properties_interface.GetAll(self._BASE_NAME)
except dbus.exceptions.DBusException as error:
raise ServiceError(error)
def __get_property(self, property_name):
try:
obj = self._bus.get_object(self._BASE_NAME, self._BASE_PATH)
properties_interface = dbus.Interface(obj, dbus.PROPERTIES_IFACE)
return properties_interface.Get(self._BASE_NAME, property_name)
except dbus.exceptions.DBusException as error:
raise PropertyError(error)
def __set_property(self, property_name, property_value):
try:
obj = self._bus.get_object(self._BASE_NAME, self._BASE_PATH)
properties_interface = dbus.Interface(obj, dbus.PROPERTIES_IFACE)
properties_interface.Set(self._BASE_NAME, property_name, property_value)
except dbus.exceptions.DBusException as error:
raise PropertyError(error)
def __get_property(self, property_name):
try:
obj = self._bus.get_object(self._BASE_NAME, self._interface_path)
properties_interface = dbus.Interface(obj, dbus.PROPERTIES_IFACE)
return properties_interface.Get(self._INTERFACE_NAME, property_name)
except dbus.exceptions.DBusException as error:
raise PropertyError(error)
def __set_property(self, property_name, property_value):
try:
obj = self._bus.get_object(self._BASE_NAME, self._interface_path)
properties_interface = dbus.Interface(obj, dbus.PROPERTIES_IFACE)
properties_interface.Set(self._INTERFACE_NAME, property_name, property_value)
except dbus.exceptions.DBusException as error:
raise PropertyError(error)
def __set_property(self, bss_path, property_name, property_value):
try:
obj = self._bus.get_object(self._BASE_NAME, bss_path)
properties_interface = dbus.Interface(obj, dbus.PROPERTIES_IFACE)
properties_interface.Set(self._BSS_NAME, property_name, property_value)
except dbus.exceptions.DBusException as error:
raise PropertyError(error)
def __get_properties(self, network_path):
try:
obj = self._bus.get_object(self._BASE_NAME, network_path)
properties_interface = dbus.Interface(obj, dbus.PROPERTIES_IFACE)
return properties_interface.GetAll(self._NETWORK_NAME)
except dbus.exceptions.DBusException as error:
raise PropertyError(error)
def __init__(self, adapter_addr=None):
"""Default initialiser.
Creates the interface to the local Bluetooth adapter device.
If address is not given then first device is list is used.
:param adapter_addr: Address of Bluetooth adapter to use.
"""
self.bus = dbus.SystemBus()
if adapter_addr is None:
adapters = list_adapters()
if len(adapters) > 0:
adapter_addr = adapters[0]
self.path = dbus_tools.get_dbus_path(adapter=adapter_addr)
self.adapter_object = self.bus.get_object(
constants.BLUEZ_SERVICE_NAME,
self.path)
self.adapter_methods = dbus.Interface(self.adapter_object,
constants.ADAPTER_INTERFACE)
self.adapter_props = dbus.Interface(self.adapter_object,
dbus.PROPERTIES_IFACE)
self._nearby_timeout = 10
self._nearby_count = 0
self.mainloop = async_tools.EventLoop()
self.bus.add_signal_receiver(dbus_tools.interfaces_added,
dbus_interface=constants.DBUS_OM_IFACE,
signal_name='InterfacesAdded')
self.bus.add_signal_receiver(dbus_tools.properties_changed,
dbus_interface=dbus.PROPERTIES_IFACE,
signal_name='PropertiesChanged',
arg0=constants.DEVICE_INTERFACE,
path_keyword='path')
def Get(self, interface, prop):
print('%s.Get(%s, %s) called' %(dbus.PROPERTIES_IFACE, repr(interface), repr(prop)))
(getter, _) = self.properties[interface][prop]
if callable(getter):
return getter()
else:
return getter
def __get_prop(obj, iface, prop):
"""Get object interface properties. Internal use only, dont touch."""
if not dbus:
log.warning("D-Bus module not found or not supported on this OS.")
return # Windows probably.
try:
return obj.Get(iface, prop, dbus_interface=dbus.PROPERTIES_IFACE)
except (dbus.DBusException, dbus.exceptions.DBusException) as err:
print(err)
return