def enable_networking(self, val):
"""
function enables/disables networking depending upon the 'val' argument
if val is True, networking is enabled
if val is False, networking is disabled
"""
try:
bus = dbus.SystemBus()
wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager')
iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.NetworkManager')
# enabling/disabling networking
m = iface.get_dbus_method("Enable", dbus_interface=None)
m(val)
except:
pass
python类SystemBus()的实例源码
def get_active_connection_info(self, ac_path):
bus = dbus.SystemBus()
wifi = bus.get_object('org.freedesktop.NetworkManager', ac_path)
iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.DBus.Properties')
# creating proxy 'Get' method
m = iface.get_dbus_method("Get", dbus_interface=None)
# getting Id of active connection
Id = m("org.freedesktop.NetworkManager.Connection.Active", "Id")
# getting Type of active connection
Type = m("org.freedesktop.NetworkManager.Connection.Active", "Type")
# getting Uuid of active connection
Uuid = m("org.freedesktop.NetworkManager.Connection.Active", "Uuid")
# getting State of active connection
State = m("org.freedesktop.NetworkManager.Connection.Active", "State")
# NOTE:
# this function only returns properties like Id, Type, Uuid, State of an active connection
# However, other properties like Dhcp4Config, Dhcp6Config, Ip4Config, Ip6Config etc. can also be obtained
return (str(Id), str(Type), str(Uuid), int(State))
def get_access_point_brief_info(self, ap_path):
bus = dbus.SystemBus()
obj = bus.get_object('org.freedesktop.NetworkManager', ap_path)
iface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus.Properties')
m = iface.get_dbus_method("Get", dbus_interface=None)
# getting Ssid
dbusArray = m("org.freedesktop.NetworkManager.AccessPoint", "Ssid")
Ssid = ''.join([chr(character) for character in dbusArray])
# getting Strength
Strength = m("org.freedesktop.NetworkManager.AccessPoint", "Strength")
# getting HwAddress
HwAddress = m("org.freedesktop.NetworkManager.AccessPoint", "HwAddress")
# getting Mode
Mode = m("org.freedesktop.NetworkManager.AccessPoint", "Mode")
return (Ssid, int(Strength), str(HwAddress), int(Mode))
def find_device_in_objects(objects, device_address, adapter_pattern=None):
bus = dbus.SystemBus()
path_prefix = ""
if adapter_pattern:
adapter = find_adapter_in_objects(objects, adapter_pattern)
path_prefix = adapter.object_path
for path, ifaces in objects.iteritems():
device = ifaces.get(DEVICE_INTERFACE)
if device is None:
continue
if (device["Address"] == device_address and
path.startswith(path_prefix)):
obj = bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, DEVICE_INTERFACE)
raise Exception("Bluetooth device not found")
def __init__(self, adapter_name):
self.listener = None
self.adapter_name = adapter_name
self._bus = dbus.SystemBus()
try:
adapter_object = self._bus.get_object('org.bluez', '/org/bluez/' + adapter_name)
except dbus.exceptions.DBusException as e:
raise _error_from_dbus_error(e)
object_manager_object = self._bus.get_object("org.bluez", "/")
self._adapter = dbus.Interface(adapter_object, 'org.bluez.Adapter1')
self._adapter_properties = dbus.Interface(self._adapter, 'org.freedesktop.DBus.Properties')
self._object_manager = dbus.Interface(object_manager_object, "org.freedesktop.DBus.ObjectManager")
self._device_path_regex = re.compile('^/org/bluez/' + adapter_name + '/dev((_[A-Z0-9]{2}){6})$')
self._devices = {}
self._discovered_devices = {}
self._interface_added_signal = None
self._properties_changed_signal = None
self._main_loop = None
self.update_devices()
def _prepair(self):
'''Try to connect to the given dbus services. If successful it will
return a callable dbus proxy and those arguments.
'''
try:
sessionbus = dbus.SessionBus()
systembus = dbus.SystemBus()
except:
return (None, None)
for dbus_props in self.DBUS_SHUTDOWN.values():
try:
if dbus_props['bus'] == SESSION_BUS:
bus = sessionbus
else:
bus = systembus
interface = bus.get_object(dbus_props['service'],
dbus_props['objectPath'])
proxy = interface.get_dbus_method(dbus_props['method'],
dbus_props['interface'])
return (proxy, dbus_props['arguments'])
except dbus.exceptions.DBusException:
continue
return (None, None)
def _check_permission(self, sender, action):
'''
Verifies if the specified action is permitted, and raises
an AccessDeniedException if not.
The caller should use ObtainAuthorization() to get permission.
'''
try:
if sender:
kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority')
(granted, _, details) = kit.CheckAuthorization(
('system-bus-name', {'name': sender}),
action, {}, dbus.UInt32(1), '', timeout=600)
if not granted:
raise AccessDeniedException('Session not authorized by PolicyKit')
except AccessDeniedException:
raise
except dbus.DBusException, ex:
raise AccessDeniedException(ex.message)
def create_dbus_server(cls, session_bus=False):
'''Return a D-BUS server backend instance.
Normally this connects to the system bus. Set session_bus to True to
connect to the session bus (for testing).
'''
backend = Backend()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
if session_bus:
backend.bus = dbus.SessionBus()
backend.enforce_polkit = False
else:
backend.bus = dbus.SystemBus()
try:
backend.dbus_name = dbus.service.BusName(DBUS_BUS_NAME, backend.bus)
except dbus.exceptions.DBusException as msg:
logging.error("Exception when spawning dbus service")
logging.error(msg)
return None
return backend
#
# Internal methods
#
def backend(self):
'''Return D-BUS backend client interface.
This gets initialized lazily.
'''
if self._dbus_iface is None:
try:
bus = dbus.SystemBus()
self._dbus_iface = dbus.Interface(bus.get_object(DBUS_BUS_NAME,
'/RecoveryMedia'),
DBUS_INTERFACE_NAME)
except dbus.DBusException as msg:
self.dbus_exception_handler(msg)
sys.exit(1)
except Exception as msg:
self.show_alert(Gtk.MessageType.ERROR, "Exception", str(msg),
transient_for=self.tool_widgets.get_object('tool_selector'))
return self._dbus_iface
def __init__(self, device_id=None):
"""Default initialiser.
1. Initialises the program loop using ``GObject``.
2. Registers the Application on the D-Bus.
3. Initialises the list of services offered by the application.
"""
# Initialise the D-Bus path and register it
self.bus = dbus.SystemBus()
self.path = '/ukBaz/bluezero'
self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
dbus.service.Object.__init__(self, self.bus_name, self.path)
# Objects to be associated with this service
self.managed_objs = []
self.eventloop = async_tools.EventLoop()
def __init__(self, service_id, uuid, primary):
"""Default initialiser.
1. Registers the service on the D-Bus.
2. Sets up the service UUID and primary flags.
:param service_id:
:param uuid: service BLE UUID
:param primary: whether or not the service is a primary service
"""
# Setup D-Bus object paths and register service
self.path = self.PATH_BASE + str('{0:04d}'.format(service_id))
self.bus = dbus.SystemBus()
self.interface = constants.GATT_SERVICE_IFACE
dbus.service.Object.__init__(self, self.bus, self.path)
self.props = {
constants.GATT_SERVICE_IFACE: {
'UUID': uuid,
'Primary': primary}
}
def __init__(self, uuid, primary, type='peripheral'):
"""Default initialiser.
1. Registers the service on the D-Bus.
2. Sets up the service UUID and primary flags.
3. Initialises the list of characteristics associated with the service.
:param uuid: service BLE UUID
:param primary: whether or not the service is a primary service
"""
# Setup D-Bus object paths and register service
self.index = id(self)
self.path = self.PATH_BASE + str(self.index)
self.bus = dbus.SystemBus()
dbus.service.Object.__init__(self, self.bus, self.path)
# Setup UUID, primary flag
self.uuid = uuid
self.primary = primary
self.type = type
self.service_data = None
# Initialise characteristics within the service
self.characteristics = []
def __init__(self, adapter_addr=None):
self.bus = dbus.SystemBus()
if adapter_addr is None:
adapters = adapter.list_adapters()
if len(adapters) > 0:
adapter_addr = adapters[0]
self.advert_mngr_path = dbus_tools.get_dbus_path(adapter=adapter_addr)
self.advert_mngr_obj = self.bus.get_object(
constants.BLUEZ_SERVICE_NAME,
self.advert_mngr_path)
self.advert_mngr_methods = dbus.Interface(
self.advert_mngr_obj,
constants.LE_ADVERTISING_MANAGER_IFACE)
self.advert_mngr_props = dbus.Interface(self.advert_mngr_obj,
dbus.PROPERTIES_IFACE)
def __init__(self, adapter_addr, device_addr, profile_uuid):
"""
Remote GATT Profile Initialisation.
:param profile_path: dbus path to the profile.
"""
self.profile_path = dbus_tools.get_profile_path(adapter_addr,
device_addr,
profile_uuid)
self.bus = dbus.SystemBus()
self.profile_object = self.bus.get_object(
constants.BLUEZ_SERVICE_NAME,
self.profile_path)
self.profile_methods = dbus.Interface(
self.profile_object,
constants.GATT_PROFILE_IFACE)
self.profile_props = dbus.Interface(self.profile_object,
dbus.PROPERTIES_IFACE)
def __init__(self, adapter_addr):
"""
GATT Manager Initialisation.
:param manager_path: dbus path to the GATT Manager.
"""
self.manager_path = dbus_tools.get_dbus_path(adapter_addr)
self.bus = dbus.SystemBus()
self.manager_obj = self.bus.get_object(
constants.BLUEZ_SERVICE_NAME,
self.manager_path)
self.manager_methods = dbus.Interface(
self.manager_obj,
constants.GATT_MANAGER_IFACE)
self.manager_props = dbus.Interface(self.manager_obj,
dbus.PROPERTIES_IFACE)
def list_adapters():
"""Return list of adapters address available on system."""
paths = []
addresses = []
bus = dbus.SystemBus()
manager = dbus.Interface(
bus.get_object(constants.BLUEZ_SERVICE_NAME, '/'),
constants.DBUS_OM_IFACE)
manager_obj = manager.GetManagedObjects()
for path, ifaces in manager_obj.items():
if constants.ADAPTER_INTERFACE in ifaces:
paths.append(path)
addresses.append(
manager_obj[path][constants.ADAPTER_INTERFACE]['Address'])
if len(paths) < 1:
raise AdapterError('No Bluetooth adapter found')
else:
return addresses
def __init__(self, services=[], filters=[avahi.LOOKUP_RESULT_LOCAL],
interface=avahi.IF_UNSPEC, protocol=avahi.PROTO_INET):
GObject.GObject.__init__(self)
self.filters = filters
self.services = services
self.interface = interface
self.protocol = protocol
try:
self.system_bus = dbus.SystemBus()
self.system_bus.add_signal_receiver(
self.avahi_dbus_connect_cb, "NameOwnerChanged", "org.freedesktop.DBus", arg0="org.freedesktop.Avahi")
except dbus.DBusException as e:
logger.error("Error Owning name on D-Bus: %s", e)
sys.exit(1)
self.db = ServiceTypeDatabase()
self.service_browsers = {}
self.started = False
def get_paired_devices(device_name):
paired_devices = []
bus = dbus.SystemBus()
adapter_path = find_adapter(device_name).object_path
om = dbus.Interface(bus.get_object(SERVICE_NAME, "/"), "org.freedesktop.DBus.ObjectManager")
objects = om.GetManagedObjects()
for path, interfaces in objects.items():
if DEVICE_INTERFACE not in interfaces:
continue
properties = interfaces[DEVICE_INTERFACE]
if properties["Adapter"] != adapter_path:
continue
paired_devices.append((str(properties["Address"]), str(properties["Alias"])))
return paired_devices
def show_adapter_info():
bus = dbus.SystemBus()
om = dbus.Interface(bus.get_object(SERVICE_NAME, "/"), "org.freedesktop.DBus.ObjectManager")
objects = om.GetManagedObjects()
for path, interfaces in objects.iteritems():
if ADAPTER_INTERFACE not in interfaces:
continue
print(" [ %s ]" % (path))
props = interfaces[ADAPTER_INTERFACE]
for (key, value) in props.items():
if (key == "Class"):
print(" %s = 0x%06x" % (key, value))
elif (key == "UUIDs"):
continue
else:
print(" %s = %s" % (key, value))
print()
def find_device_in_objects(objects, device_address, adapter_pattern=None):
bus = dbus.SystemBus()
path_prefix = ""
if adapter_pattern:
adapter = find_adapter_in_objects(objects, adapter_pattern)
path_prefix = adapter.object_path
for path, ifaces in objects.iteritems():
device = ifaces.get(DEVICE_INTERFACE)
if device is None:
continue
if (device["Address"] == device_address and
path.startswith(path_prefix)):
obj = bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, DEVICE_INTERFACE)
raise Exception("Bluetooth device not found")
def find_device_in_objects(objects, device_address, adapter_pattern=None):
bus = dbus.SystemBus()
path_prefix = ""
if adapter_pattern:
adapter = find_adapter_in_objects(objects, adapter_pattern)
path_prefix = adapter.object_path
for path, ifaces in objects.iteritems():
device = ifaces.get(DEVICE_INTERFACE)
if device is None:
continue
if (device["Address"] == device_address and
path.startswith(path_prefix)):
obj = bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, DEVICE_INTERFACE)
raise Exception("Bluetooth device not found")
def publish(self):
bus = dbus.SystemBus()
server = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
avahi.DBUS_PATH_SERVER
),
avahi.DBUS_INTERFACE_SERVER
)
g = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
server.EntryGroupNew()
),
avahi.DBUS_INTERFACE_ENTRY_GROUP
)
g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
self.name, self.stype, self.domain, self.host,
dbus.UInt16(self.port), self.text)
g.Commit()
self.group = g
def publish(self):
bus = dbus.SystemBus()
server = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
avahi.DBUS_PATH_SERVER
),
avahi.DBUS_INTERFACE_SERVER
)
g = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
server.EntryGroupNew()
),
avahi.DBUS_INTERFACE_ENTRY_GROUP
)
g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
self.name, self.stype, self.domain, self.host,
dbus.UInt16(self.port), self.text)
g.Commit()
self.group = g
def find_device_in_objects(objects, device_address, adapter_pattern=None):
bus = dbus.SystemBus()
path_prefix = ""
if adapter_pattern:
adapter = find_adapter_in_objects(objects, adapter_pattern)
path_prefix = adapter.object_path
for path, ifaces in objects.iteritems():
device = ifaces.get(DEVICE_INTERFACE)
if device is None:
continue
if (device["Address"] == device_address and
path.startswith(path_prefix)):
obj = bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, DEVICE_INTERFACE)
raise Exception("Bluetooth device not found")
def _setup_signals(self):
""" Connect signals to the PkProgress from libpackagekitlib,
because PK DBus exposes only a generic Changed, without
specifying the property changed
"""
self._trans.connect('notify::role', self._emit,
'role-changed', 'role')
self._trans.connect('notify::status', self._emit,
'status-changed', 'status')
self._trans.connect('notify::percentage', self._emit,
'progress-changed', 'percentage')
# SC UI does not support subprogress:
#self._trans.connect('notify::subpercentage', self._emit,
# 'progress-changed', 'subpercentage')
self._trans.connect('notify::percentage', self._emit,
'progress-changed', 'percentage')
self._trans.connect('notify::allow-cancel', self._emit,
'cancellable-changed', 'allow-cancel')
# connect the delete:
proxy = dbus.SystemBus().get_object('org.freedesktop.PackageKit',
self.tid)
trans = dbus.Interface(proxy, 'org.freedesktop.PackageKit.Transaction')
trans.connect_to_signal("Destroy", self._remove)
def _setup_signals(self):
""" Connect signals to the PkProgress from libpackagekitlib,
because PK DBus exposes only a generic Changed, without
specifying the property changed
"""
self._trans.connect('notify::role', self._emit,
'role-changed', 'role')
self._trans.connect('notify::status', self._emit,
'status-changed', 'status')
self._trans.connect('notify::percentage', self._emit,
'progress-changed', 'percentage')
# SC UI does not support subprogress:
#self._trans.connect('notify::subpercentage', self._emit,
# 'progress-changed', 'subpercentage')
self._trans.connect('notify::percentage', self._emit,
'progress-changed', 'percentage')
self._trans.connect('notify::allow-cancel', self._emit,
'cancellable-changed', 'allow-cancel')
# connect the delete:
proxy = dbus.SystemBus().get_object('org.freedesktop.PackageKit',
self.tid)
trans = dbus.Interface(proxy, 'org.freedesktop.PackageKit.Transaction')
trans.connect_to_signal("Destroy", self._remove)
def is_service_running(service):
""" Queries systemd through dbus to see if the service is running """
service_running = False
bus = SystemBus()
systemd = bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
manager = Interface(systemd, dbus_interface='org.freedesktop.systemd1.Manager')
try:
service_unit = service if service.endswith('.service') else manager.GetUnit('{0}.service'.format(service))
service_proxy = bus.get_object('org.freedesktop.systemd1', str(service_unit))
service_properties = Interface(service_proxy, dbus_interface='org.freedesktop.DBus.Properties')
service_load_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'LoadState')
service_active_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
if service_load_state == 'loaded' and service_active_state == 'active':
service_running = True
except DBusException:
pass
return service_running
def is_service_running(service):
""" Queries systemd through dbus to see if the service is running """
service_running = False
bus = SystemBus()
systemd = bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
manager = Interface(systemd, dbus_interface='org.freedesktop.systemd1.Manager')
try:
service_unit = service if service.endswith('.service') else manager.GetUnit('{0}.service'.format(service))
service_proxy = bus.get_object('org.freedesktop.systemd1', str(service_unit))
service_properties = Interface(service_proxy, dbus_interface='org.freedesktop.DBus.Properties')
service_load_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'LoadState')
service_active_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
if service_load_state == 'loaded' and service_active_state == 'active':
service_running = True
except DBusException:
pass
return service_running
def is_service_running(service):
""" Queries systemd through dbus to see if the service is running """
service_running = False
bus = SystemBus()
systemd = bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
manager = Interface(systemd, dbus_interface='org.freedesktop.systemd1.Manager')
try:
service_unit = service if service.endswith('.service') else manager.GetUnit('{0}.service'.format(service))
service_proxy = bus.get_object('org.freedesktop.systemd1', str(service_unit))
service_properties = Interface(service_proxy, dbus_interface='org.freedesktop.DBus.Properties')
service_load_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'LoadState')
service_active_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
if service_load_state == 'loaded' and service_active_state == 'active':
service_running = True
except DBusException:
pass
return service_running
def is_service_running(service):
""" Queries systemd through dbus to see if the service is running """
service_running = False
bus = SystemBus()
systemd = bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
manager = Interface(systemd, dbus_interface='org.freedesktop.systemd1.Manager')
try:
service_unit = service if service.endswith('.service') else manager.GetUnit('{0}.service'.format(service))
service_proxy = bus.get_object('org.freedesktop.systemd1', str(service_unit))
service_properties = Interface(service_proxy, dbus_interface='org.freedesktop.DBus.Properties')
service_load_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'LoadState')
service_active_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
if service_load_state == 'loaded' and service_active_state == 'active':
service_running = True
except DBusException:
pass
return service_running