def get_paired_devices(device_name):
paired_devices = []
bus = dbus.SystemBus()
adapter_path = find_adapter(device_name).object_path
om = dbus.Interface(bus.get_object(SERVICE_NAME, "/"), "org.freedesktop.DBus.ObjectManager")
objects = om.GetManagedObjects()
for path, interfaces in objects.items():
if DEVICE_INTERFACE not in interfaces:
continue
properties = interfaces[DEVICE_INTERFACE]
if properties["Adapter"] != adapter_path:
continue
paired_devices.append((str(properties["Address"]), str(properties["Alias"])))
return paired_devices
python类Interface()的实例源码
def player_control(self, action):
iface = dbus.Interface(self.bus.get_object(bluezutils.SERVICE_NAME, self._player_object_path), bluezutils.MEDIAPLAYER_INTERFACE)
try:
if action == "play":
iface.Play()
elif action == "pause":
iface.Pause()
elif action == "stop":
iface.Stop()
elif action == "prev":
iface.Previous()
elif action == "next":
iface.Next()
elif action == "forward":
iface.FastForward()
elif action == "rewind":
iface.Rewind()
else:
return False
except Exception as ex:
print("Player communicate error: '{0}'".format(ex.message))
return False
def show_adapter_info():
bus = dbus.SystemBus()
om = dbus.Interface(bus.get_object(SERVICE_NAME, "/"), "org.freedesktop.DBus.ObjectManager")
objects = om.GetManagedObjects()
for path, interfaces in objects.iteritems():
if ADAPTER_INTERFACE not in interfaces:
continue
print(" [ %s ]" % (path))
props = interfaces[ADAPTER_INTERFACE]
for (key, value) in props.items():
if (key == "Class"):
print(" %s = 0x%06x" % (key, value))
elif (key == "UUIDs"):
continue
else:
print(" %s = %s" % (key, value))
print()
def find_device_in_objects(objects, device_address, adapter_pattern=None):
bus = dbus.SystemBus()
path_prefix = ""
if adapter_pattern:
adapter = find_adapter_in_objects(objects, adapter_pattern)
path_prefix = adapter.object_path
for path, ifaces in objects.iteritems():
device = ifaces.get(DEVICE_INTERFACE)
if device is None:
continue
if (device["Address"] == device_address and
path.startswith(path_prefix)):
obj = bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, DEVICE_INTERFACE)
raise Exception("Bluetooth device not found")
def find_device_in_objects(objects, device_address, adapter_pattern=None):
bus = dbus.SystemBus()
path_prefix = ""
if adapter_pattern:
adapter = find_adapter_in_objects(objects, adapter_pattern)
path_prefix = adapter.object_path
for path, ifaces in objects.iteritems():
device = ifaces.get(DEVICE_INTERFACE)
if device is None:
continue
if (device["Address"] == device_address and
path.startswith(path_prefix)):
obj = bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, DEVICE_INTERFACE)
raise Exception("Bluetooth device not found")
def find_device_in_objects(objects, device_address, adapter_pattern=None):
bus = dbus.SystemBus()
path_prefix = ""
if adapter_pattern:
adapter = find_adapter_in_objects(objects, adapter_pattern)
path_prefix = adapter.object_path
for path, ifaces in objects.iteritems():
device = ifaces.get(DEVICE_INTERFACE)
if device is None:
continue
if (device["Address"] == device_address and
path.startswith(path_prefix)):
obj = bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, DEVICE_INTERFACE)
raise Exception("Bluetooth device not found")
def publish(self):
bus = dbus.SystemBus()
server = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
avahi.DBUS_PATH_SERVER
),
avahi.DBUS_INTERFACE_SERVER
)
g = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
server.EntryGroupNew()
),
avahi.DBUS_INTERFACE_ENTRY_GROUP
)
g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
self.name, self.stype, self.domain, self.host,
dbus.UInt16(self.port), self.text)
g.Commit()
self.group = g
def _find_adapter(self):
"""
Tries to find a bluetooth adapter with the required functions.
:return: path to an adapter or None
"""
required_interfaces = [GATT_MANAGER_IFACE, LE_ADVERTISING_MANAGER_IFACE]
object_manager = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
objects = object_manager.GetManagedObjects()
for object_path, properties in objects.items():
missing_interfaces = [i for i in required_interfaces if i not in properties.keys()]
if missing_interfaces:
continue
return object_path.rsplit('/', 1)[1]
return None
def publish(self):
bus = dbus.SystemBus()
server = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
avahi.DBUS_PATH_SERVER
),
avahi.DBUS_INTERFACE_SERVER
)
g = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
server.EntryGroupNew()
),
avahi.DBUS_INTERFACE_ENTRY_GROUP
)
g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
self.name, self.stype, self.domain, self.host,
dbus.UInt16(self.port), self.text)
g.Commit()
self.group = g
def _call_added(self, path, properties):
call = dbus.Interface(
self._bus.get_object(Ofono.SERVICE_NAME, path),
Ofono.VOICE_CALL_INTERFACE
)
self._calls[call.object_path] = call
properties = call.GetProperties()
state = properties['State']
number = properties['LineIdentification']
self.log().info('%s: %s' % (state, number))
listeners = []
if state == 'incoming' or state == 'waiting':
listeners = self._on_incoming_call_listeners
elif state == 'active' or state == 'dialing':
listeners = self._on_call_began_listeners
for listener in listeners:
listener(path)
def attach_audio_gateway(self, adapter, device, listener):
path, properties = self._find_child_hfp_modem(adapter, device)
if path:
modem = dbus.Interface(
self._bus.get_object(self.SERVICE_NAME, path),
self.MODEM_INTERFACE
)
if properties['Online']:
self._transition_to(State.modem_came_online(modem, listener))
else:
self._transition_to(State.modem_found(modem, listener))
else:
self._transition_to(State.modem_not_found(adapter, device, listener))
def _modem_found(self, path, properties):
if self._state.is_waiting_for_modem_appearance():
if Ofono._is_child_hfp_modem(self._state.adapter, self._state.device, path, properties):
modem = dbus.Interface(
self._bus.get_object(self.SERVICE_NAME, path),
self.MODEM_INTERFACE
)
if properties['Online']:
self._transition_to(
State.modem_came_online(
modem,
self._state.audio_gateway_ready_listener
)
)
else:
self._transition_to(
State.modem_found(
modem,
self._state.audio_gateway_ready_listener
)
)
def check_ding(account, sender, message, conv, flags):
sender = sender.encode('utf-8')
message = message.encode('utf-8')
obj = bus.get_object("im.pidgin.purple.PurpleService", "/im/pidgin/purple/PurpleObject")
purple = dbus.Interface(obj, "im.pidgin.purple.PurpleInterface")
title = purple.PurpleConversationGetTitle(conv).encode('utf-8')
if not title:
title = conv
if len(dingregex.findall(message)) > 0:
pass
#pynotify.init("Basics")
#notify = pynotify.Notification("DING DING DING","{} called for DING DING DING in {}".format(sender,title),"emblem-danger")
#notify.set_timeout(pynotify.EXPIRES_NEVER)
#notify.show()
if DEBUG:
print "sender: {} message: {}, account: {}, conversation: {}, flags: {}, title: {}".format(sender,message,account,conv,flags,title)
f=open("_","w")
f.write(message)
f.close()
os.system("echo '%s' | python _sendFAX.py localhost 10111 Star +bw" % sender)
os.system("cat _ | python _sendFAX.py localhost 10111 Star -")
def find_device_in_objects(objects, device_address, adapter_pattern=None):
bus = dbus.SystemBus()
path_prefix = ""
if adapter_pattern:
adapter = find_adapter_in_objects(objects, adapter_pattern)
path_prefix = adapter.object_path
for path, ifaces in objects.iteritems():
device = ifaces.get(DEVICE_INTERFACE)
if device is None:
continue
if (device["Address"] == device_address and
path.startswith(path_prefix)):
obj = bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, DEVICE_INTERFACE)
raise Exception("Bluetooth device not found")
def connected(self, service):
if self.handle >= 0:
return True
bus = dbus.SessionBus()
wId = 0
try:
remote_obj = bus.get_object(self.bus_name, self.object_path)
self.iface = dbus.Interface(remote_obj, 'org.kde.KWallet')
self.handle = self.iface.open(
self.iface.networkWallet(), wId, self.appid)
except dbus.DBusException:
self.handle = -1
if self.handle < 0:
return False
self._migrate(service)
return True
def get_introspection(self):
bus = dbus.SystemBus()
obj = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager')
iface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus.Introspectable')
# getting introspection xml
m = iface.get_dbus_method("Introspect", dbus_interface=None)
return m()
def get_devices(self):
bus = dbus.SystemBus()
wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager')
iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.NetworkManager')
# getting all devices
m = iface.get_dbus_method("GetDevices", dbus_interface=None)
devs = []
for dev in m():
devs.append("%s" % dev)
return devs
def get_active_connections(self):
bus = dbus.SystemBus()
wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager')
iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.DBus.Properties')
m = iface.get_dbus_method("Get", dbus_interface=None)
return [ str(ac) for ac in m("org.freedesktop.NetworkManager", "ActiveConnections") ]
def get_wifi_access_points_by_dev(self, device_path):
bus = dbus.SystemBus()
obj = bus.get_object('org.freedesktop.NetworkManager', device_path)
iface = dbus.Interface(obj, dbus_interface='org.freedesktop.NetworkManager.Device.Wireless')
# getting all wireless access points
m = iface.get_dbus_method("GetAccessPoints", dbus_interface=None)
return [str(ap) for ap in m()]
def get_access_point_all_info(self, ap_path):
bus = dbus.SystemBus()
obj = bus.get_object('org.freedesktop.NetworkManager', ap_path)
iface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus.Properties')
m = iface.get_dbus_method("GetAll", dbus_interface=None)
# getting all ppoperties like Ssid, Strength, HwAddress etc.
props = m("org.freedesktop.NetworkManager.AccessPoint")
for k,v in props.iteritems():
print k,v
return props