def unwrap(self, val):
if isinstance(val, dbus.ByteArray):
return "".join([str(x) for x in val])
if isinstance(val, (dbus.Array, list, tuple)):
return [self.unwrap(x) for x in val]
if isinstance(val, (dbus.Dictionary, dict)):
return dict([(self.unwrap(x), self.unwrap(y)) for x, y in val.items()])
if isinstance(val, dbus.ObjectPath):
if val.startswith('/org/freedesktop/NetworkManager/'):
classname = val.split('/')[4]
classname = {
'Settings': 'Connection',
'Devices': 'Device',
}.get(classname, classname)
return globals()[classname](val)
if isinstance(val, (dbus.Signature, dbus.String)):
return unicode(val)
if isinstance(val, dbus.Boolean):
return bool(val)
if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
return int(val)
if isinstance(val, dbus.Byte):
return bytes([int(val)])
return val
python类UInt32()的实例源码
def _check_permission(self, sender, action):
'''
Verifies if the specified action is permitted, and raises
an AccessDeniedException if not.
The caller should use ObtainAuthorization() to get permission.
'''
try:
if sender:
kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority')
(granted, _, details) = kit.CheckAuthorization(
('system-bus-name', {'name': sender}),
action, {}, dbus.UInt32(1), '', timeout=600)
if not granted:
raise AccessDeniedException('Session not authorized by PolicyKit')
except AccessDeniedException:
raise
except dbus.DBusException, ex:
raise AccessDeniedException(ex.message)
def add_service_type(self, type):
interface, protocol, domain = (
self.interface, self.protocol, self.domain)
# Are we already browsing this domain for this type?
if self.already_browsing(type):
return
logger.info("Browsing for services of type '%s' in domain '%s' on %s.%i ..." %
(type, domain, self.siocgifname(interface), protocol))
browser = self.server.ServiceBrowserNew(
interface, protocol, type, domain, dbus.UInt32(0))
bus = dbus.Interface(self.system_bus.get_object(
avahi.DBUS_NAME, browser), avahi.DBUS_INTERFACE_SERVICE_BROWSER)
bus.connect_to_signal('ItemNew', self.service_add)
bus.connect_to_signal('ItemRemove', self.service_remove)
self.service_browsers[(interface, protocol, type, domain)] = bus
def RequestPasskey(self, device):
print_info("RequestPasskey: {}\n".format(device))
if not self._trust(device):
print_error("RequestPasskey: failed to trust\n")
raise _Rejected
dev_info = self._get_device_info(device)
try:
passkey = int(self._client.request_passkey(dev_info))
except BaseException as error:
print_error("RequestPasskey: {}\n".format(error))
raise _Rejected
return dbus.UInt32(passkey)
def publish(self):
bus = dbus.SystemBus()
server = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
avahi.DBUS_PATH_SERVER
),
avahi.DBUS_INTERFACE_SERVER
)
g = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
server.EntryGroupNew()
),
avahi.DBUS_INTERFACE_ENTRY_GROUP
)
g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
self.name, self.stype, self.domain, self.host,
dbus.UInt16(self.port), self.text)
g.Commit()
self.group = g
def publish(self):
bus = dbus.SystemBus()
server = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
avahi.DBUS_PATH_SERVER
),
avahi.DBUS_INTERFACE_SERVER
)
g = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
server.EntryGroupNew()
),
avahi.DBUS_INTERFACE_ENTRY_GROUP
)
g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
self.name, self.stype, self.domain, self.host,
dbus.UInt16(self.port), self.text)
g.Commit()
self.group = g
def convert(dbus_obj):
"""Converts dbus_obj from dbus type to python type.
:param dbus_obj: dbus object.
:returns: dbus_obj in python type.
"""
_isinstance = partial(isinstance, dbus_obj)
ConvertType = namedtuple('ConvertType', 'pytype dbustypes')
pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
dbus.UInt16, dbus.UInt32, dbus.UInt64))
pybool = ConvertType(bool, (dbus.Boolean, ))
pyfloat = ConvertType(float, (dbus.Double, ))
pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)),
(dbus.Array, ))
pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)),
(dbus.Struct, ))
types_str = (dbus.ObjectPath, dbus.Signature, dbus.String)
pystr = ConvertType(str, types_str)
pydict = ConvertType(
lambda _obj: dict(list(zip(list(map(convert, dbus_obj.keys())),
list(map(convert, dbus_obj.values()))
))
),
(dbus.Dictionary, )
)
for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict):
if any(map(_isinstance, conv.dbustypes)):
return conv.pytype(dbus_obj)
else:
return dbus_obj
def addr_to_dbus(addr, family):
if (family == socket.AF_INET):
return dbus.UInt32(struct.unpack('I', socket.inet_pton(family, addr))[0])
else:
return dbus.ByteArray(socket.inet_pton(family, addr))
def mask_to_dbus(mask):
return dbus.UInt32(mask)
def convert(dbus_obj):
"""Converts dbus_obj from dbus type to python type.
:param dbus_obj: dbus object.
:returns: dbus_obj in python type.
"""
_isinstance = partial(isinstance, dbus_obj)
ConvertType = namedtuple('ConvertType', 'pytype dbustypes')
pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
dbus.UInt16, dbus.UInt32, dbus.UInt64))
pybool = ConvertType(bool, (dbus.Boolean, ))
pyfloat = ConvertType(float, (dbus.Double, ))
pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)),
(dbus.Array, ))
pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)),
(dbus.Struct, ))
types_str = (dbus.ObjectPath, dbus.Signature, dbus.String)
pystr = ConvertType(str, types_str)
pydict = ConvertType(
lambda _obj: dict(zip(map(convert, dbus_obj.keys()),
map(convert, dbus_obj.values())
)
),
(dbus.Dictionary, )
)
for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict):
if any(map(_isinstance, conv.dbustypes)):
return conv.pytype(dbus_obj)
else:
return dbus_obj
def base_to_python(val):
if isinstance(val, dbus.ByteArray):
return "".join([str(x) for x in val])
if isinstance(val, (dbus.Array, list, tuple)):
return [fixups.base_to_python(x) for x in val]
if isinstance(val, (dbus.Dictionary, dict)):
return dict([(fixups.base_to_python(x), fixups.base_to_python(y)) for x, y in val.items()])
if isinstance(val, dbus.ObjectPath):
for obj in (NetworkManager, Settings, AgentManager):
if val == obj.object_path:
return obj
if val.startswith('/org/freedesktop/NetworkManager/'):
classname = val.split('/')[4]
classname = {
'Settings': 'Connection',
'Devices': 'Device',
}.get(classname, classname)
try:
return globals()[classname](val)
except ObjectVanished:
return None
if val == '/':
return None
if isinstance(val, (dbus.Signature, dbus.String)):
return six.text_type(val)
if isinstance(val, dbus.Boolean):
return bool(val)
if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
return int(val)
if isinstance(val, dbus.Byte):
return six.int2byte(int(val))
return val
def addr_to_dbus(addr, family):
if family == socket.AF_INET:
return dbus.UInt32(struct.unpack('I', socket.inet_pton(family, addr))[0])
else:
return dbus.ByteArray(socket.inet_pton(family, addr))
def mask_to_dbus(mask):
return dbus.UInt32(mask)
def set_ap_scan(self, value):
return self.__set_property("ApScan", dbus.UInt32(value))
def service_add(self, interface, protocol, name, type, domain, flags):
logger.debug("Found service '%s' of type '%s:%s' in domain '%s' on %s.%i." %
(name, type, flags, domain, self.siocgifname(interface), avahi.LOOKUP_RESULT_LOCAL))
# this check is for local services
if flags & avahi.LOOKUP_RESULT_LOCAL:
logger.debug('Dropping local service')
return
self.server.ResolveService(interface, protocol, name, type, domain, avahi.PROTO_INET, dbus.UInt32(
0), reply_handler=self.service_resolved, error_handler=logger.error)
def run(self):
_bluetooth.make_discoverable(True, self.timeout)
_bluetooth.set_adapter_property("Pairable", dbus.Boolean(1))
_bluetooth.set_adapter_property("PairableTimeout", dbus.UInt32(0))
self.agent = Agent(self.client_class, self.timeout, self.path)
if not self._register():
self.shutdown()
return
self._mainloop.run()
def make_discoverable(self, value=True, timeout=180):
try:
adapter = bluezutils.find_adapter()
except (bluezutils.BluezUtilError,
dbus.exceptions.DBusException) as error:
print_error(str(error) + "\n")
return False
try:
props = dbus.Interface(
self._bus.get_object("org.bluez", adapter.object_path),
"org.freedesktop.DBus.Properties")
timeout = int(timeout)
value = int(value)
if int(props.Get(
"org.bluez.Adapter1", "DiscoverableTimeout")) != timeout:
props.Set(
"org.bluez.Adapter1", "DiscoverableTimeout",
dbus.UInt32(timeout))
if int(props.Get("org.bluez.Adapter1", "Discoverable")) != value:
props.Set(
"org.bluez.Adapter1", "Discoverable", dbus.Boolean(value))
except dbus.exceptions.DBusException as error:
print_error(str(error) + "\n")
return False
return True
def discoverable_timeout(self, value):
self._adapter_props.Set('org.bluez.Adapter1', 'DiscoverableTimeout', dbus.UInt32(value))
def RequestPasskey(self, device):
self.log().info("RequestPasskey (%s)" % (device))
return dbus.UInt32(self.__passcode)
def enable_pairability(self, timeout = 0):
self._set_property('Discoverable', True)
self._set_property('Pairable', True)
self._set_property('PairableTimeout', dbus.UInt32(timeout))
self._set_property('DiscoverableTimeout', dbus.UInt32(timeout))
def disable_pairability(self):
try:
self._set_property('Discoverable', False)
self._set_property('Pairable', False)
self._set_property('PairableTimeout', dbus.UInt32(0))
self._set_property('DiscoverableTimeout', dbus.UInt32(180))
except:
pass
def RequestPasskey(self, device):
self.log().debug("RequestPasskey (%s)" % (device))
return dbus.UInt32(self._passcode)
def _check_polkit_privilege(self, sender, conn, privilege):
'''Verify that sender has a given PolicyKit privilege.
sender is the sender's (private) D-BUS name, such as ":1:42"
(sender_keyword in @dbus.service.methods). conn is
the dbus.Connection object (connection_keyword in
@dbus.service.methods). privilege is the PolicyKit privilege string.
This method returns if the caller is privileged, and otherwise throws a
PermissionDeniedByPolicy exception.
'''
if sender is None and conn is None:
# called locally, not through D-BUS
return
if not self.enforce_polkit:
# that happens for testing purposes when running on the session
# bus, and it does not make sense to restrict operations here
return
# get peer PID
if self.dbus_info is None:
self.dbus_info = dbus.Interface(conn.get_object('org.freedesktop.DBus',
'/org/freedesktop/DBus/Bus', False), 'org.freedesktop.DBus')
pid = self.dbus_info.GetConnectionUnixProcessID(sender)
# query PolicyKit
if self.polkit is None:
self.polkit = dbus.Interface(dbus.SystemBus().get_object(
'org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority', False),
'org.freedesktop.PolicyKit1.Authority')
try:
# we don't need is_challenge return here, since we call with AllowUserInteraction
(is_auth, unused, details) = self.polkit.CheckAuthorization(
('unix-process', {'pid': dbus.UInt32(pid, variant_level=1),
'start-time': dbus.UInt64(0, variant_level=1)}),
privilege, {'': ''}, dbus.UInt32(1), '', timeout=600)
except dbus.DBusException as msg:
if msg.get_dbus_name() == \
'org.freedesktop.DBus.Error.ServiceUnknown':
# polkitd timed out, connect again
self.polkit = None
return self._check_polkit_privilege(sender, conn, privilege)
else:
raise
if not is_auth:
logging.debug('_check_polkit_privilege: sender %s on connection %s pid %i is not authorized for %s: %s',
sender, conn, pid, privilege, str(details))
raise PermissionDeniedByPolicy(privilege)
#
# Internal API for calling from Handlers (not exported through D-BUS)
#
def __init__(self, onBluetoothConnected_callback, onPlayerChanged_callback):
self.onBluetoothConnected_callback = onBluetoothConnected_callback
self.onPlayerChanged_callback = onPlayerChanged_callback
# Get the system bus
try:
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.bus = dbus.SystemBus()
except Exception as ex:
print("Unable to get the system dbus: '{0}'. Exiting. Is dbus running?".format(ex.message))
return False
adapter_path = bluezutils.find_adapter(None).object_path
adapter = dbus.Interface(self.bus.get_object(bluezutils.SERVICE_NAME, adapter_path), "org.freedesktop.DBus.Properties")
# Power on adapter
adapter.Set(bluezutils.ADAPTER_INTERFACE, "Powered", dbus.Boolean(1))
# Set name to display in available connections
adapter.Set(bluezutils.ADAPTER_INTERFACE, "Alias", bluezutils.BT_DEVICE_NAME)
# Set pairable on
adapter.Set(bluezutils.ADAPTER_INTERFACE, "Pairable", dbus.Boolean(1))
# Set discoverable on
adapter.Set(bluezutils.ADAPTER_INTERFACE, "Discoverable", dbus.Boolean(1))
# Set discoverable timeout to 0
adapter.Set(bluezutils.ADAPTER_INTERFACE, "DiscoverableTimeout", dbus.UInt32(0))
# Set paraible time out to 0
adapter.Set(bluezutils.ADAPTER_INTERFACE, "PairableTimeout", dbus.UInt32(0))
bluezutils.show_adapter_info()
self.path = "/test/agent"
agent = bluezutils.Agent(self.bus, self.path)
obj = self.bus.get_object(bluezutils.SERVICE_NAME, "/org/bluez");
self.manager = dbus.Interface(obj, "org.bluez.AgentManager1")
self.manager.RegisterAgent(self.path, "NoInputNoOutput")
print("Bluetooth AgentManager registered")
# listen for signal of remove adapter
# self.bus.add_signal_receiver(self.interfaces_removed, bus_name=bluezutils.SERVICE_NAME, dbus_interface="org.freedesktop.DBus.ObjectManager", signal_name="InterfacesRemoved")
# listen for signals on the Bluez bus
self.bus.add_signal_receiver(self.device_property_changed, bus_name=bluezutils.SERVICE_NAME, signal_name="PropertiesChanged", path_keyword="device_path", interface_keyword="interface")
# listen for signal of changing properties
self.bus.add_signal_receiver(self.player_changed, bus_name=bluezutils.SERVICE_NAME, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path_keyword="path")
print("Signals receiver registered")
self.manager.RequestDefaultAgent(self.path)