def _check_polkit_privilege(self, sender, conn, privilege):
'''Verify that sender has a given PolicyKit privilege.
sender is the sender's (private) D-BUS name, such as ":1:42"
(sender_keyword in @dbus.service.methods). conn is
the dbus.Connection object (connection_keyword in
@dbus.service.methods). privilege is the PolicyKit privilege string.
This method returns if the caller is privileged, and otherwise throws a
PermissionDeniedByPolicy exception.
'''
if sender is None and conn is None:
# called locally, not through D-BUS
return
if not self.enforce_polkit:
# that happens for testing purposes when running on the session
# bus, and it does not make sense to restrict operations here
return
# get peer PID
if self.dbus_info is None:
self.dbus_info = dbus.Interface(conn.get_object('org.freedesktop.DBus',
'/org/freedesktop/DBus/Bus', False), 'org.freedesktop.DBus')
pid = self.dbus_info.GetConnectionUnixProcessID(sender)
# query PolicyKit
if self.polkit is None:
self.polkit = dbus.Interface(dbus.SystemBus().get_object(
'org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority', False),
'org.freedesktop.PolicyKit1.Authority')
try:
# we don't need is_challenge return here, since we call with AllowUserInteraction
(is_auth, unused, details) = self.polkit.CheckAuthorization(
('unix-process', {'pid': dbus.UInt32(pid, variant_level=1),
'start-time': dbus.UInt64(0, variant_level=1)}),
privilege, {'': ''}, dbus.UInt32(1), '', timeout=600)
except dbus.DBusException as msg:
if msg.get_dbus_name() == \
'org.freedesktop.DBus.Error.ServiceUnknown':
# polkitd timed out, connect again
self.polkit = None
return self._check_polkit_privilege(sender, conn, privilege)
else:
raise
if not is_auth:
logging.debug('_check_polkit_privilege: sender %s on connection %s pid %i is not authorized for %s: %s',
sender, conn, pid, privilege, str(details))
raise PermissionDeniedByPolicy(privilege)
#
# Internal API for calling from Handlers (not exported through D-BUS)
#
评论列表
文章目录