def _replication_agreements(self):
self._log.debug('Checking for replication agreements...')
msg = []
healthy = True
suffix = self._base_dn.replace('=', '\\3D').replace(',', '\\2C')
results = self._search(
'cn=replica,cn=%s,cn=mapping tree,cn=config' % suffix,
'(objectClass=*)',
['nsDS5ReplicaHost', 'nsds5replicaLastUpdateStatus'],
scope=ldap.SCOPE_ONELEVEL
)
for result in results:
dn, attrs = result
host = attrs['nsDS5ReplicaHost'][0].decode('utf-8')
host = host.partition('.')[0]
status = attrs['nsds5replicaLastUpdateStatus'][0].decode('utf-8')
status = status.replace('Error ', '').partition(' ')[0].strip('()')
if status != '0':
healthy = False
msg.append('%s %s' % (host, status))
return '\n'.join(msg), healthy
python类SCOPE_ONELEVEL的实例源码
def check_userfree(self, uid):
"""Check if a username is free.
If username is already used or is an LDAP group, an
RBFatalError is raised. If the username is in the additional
reserved LDAP tree, an RBWarningError is raised and checked if
it is to be overridden. """
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL, 'uid=%s' % uid)
if res:
raise RBFatalError(
"Username '%s' is already taken by %s account (%s)" %
(uid, res[0][1]['objectClass'][0].decode(),
res[0][1]['cn'][0].decode()))
res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
'cn=%s' % uid)
if res:
raise RBFatalError("Username '%s' is reserved (LDAP Group)" % uid)
res = self.ldap.search_s(rbconfig.ldap_reserved_tree,
ldap.SCOPE_ONELEVEL, 'uid=%s' % uid)
if res:
self.rberror(
RBWarningError("Username '%s' is reserved (%s)" % (uid, res[0][
1]['description'][0].decode())))
def list_pre_sync(self):
"""Return dictionary of all users for useradm pre_sync() dump."""
res = self.ldap.search_s(
rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL,
'objectClass=posixAccount', ('uid', 'homeDirectory',
'objectClass'))
tmp = {}
for data in res:
for i in data['objectClass']:
i = i.decode()
if i in rbconfig.usertypes:
break
else:
raise RBFatalError(
"Unknown usertype for user '%s'" % data['uid'][0])
tmp[data['uid'][0]] = {
'homeDirectory': data['homeDirectory'][0],
'usertype': data['uid'][0]
}
return tmp
def uidNumber_findmax(self):
"""Return highest uidNumber found in LDAP accounts tree.
This is only used to create the uidNumber file, the
uidNumber_readnext() function should be used for getting the
next available uidNumber."""
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL,
'objectClass=posixAccount', ('uidNumber', ))
maxuid = -1
for i in res:
tmp = int(i[1]['uidNumber'][0])
if tmp > maxuid:
maxuid = tmp
return maxuid
def _search(self, filterstr, attrlist=None):
"""
A wrapper for the `LDAPObject.search_s` functionality.
Perform an LDAP search operation, starting at the configured base DN.
The filterstr argument is a string representation of the filter to
apply in the search. The retrieved attributes can be limited with the
attrlist parameter. If attrlist is None, all the attributes of each
entry are returned.
"""
with self._ldap_connection() as ldap_cxn:
results = ldap_cxn.search_s(self.base_dn, ldap.SCOPE_ONELEVEL, filterstr, attrlist)
return results
def _count_hbac_rules(self):
self._log.debug('Counting HBAC rules...')
results = self._search(
'cn=hbac,%s' % self._base_dn,
'(ipaUniqueID=*)',
scope=ldap.SCOPE_ONELEVEL
)
return len(results)
def _count_sudo_rules(self):
self._log.debug('Counting SUDO rules...')
results = self._search(
'cn=sudorules,cn=sudo,%s' % self._base_dn,
'(ipaUniqueID=*)',
scope=ldap.SCOPE_ONELEVEL
)
return len(results)
def _count_dns_zones(self):
self._log.debug('Counting DNS zones...')
results = self._search(
'cn=dns,%s' % self._base_dn,
'(|(objectClass=idnszone)(objectClass=idnsforwardzone))',
scope=ldap.SCOPE_ONELEVEL
)
return len(results)
def _count_certificates(self):
self._log.debug('Counting certificates...')
try:
results = self._search(
'ou=certificateRepository,ou=ca,o=ipaca',
'(certStatus=*)',
scope=ldap.SCOPE_ONELEVEL
)
except ldap.NO_SUCH_OBJECT:
return 'N/A'
n = len(results)
return n
def check_user_byname(self, uid):
"""Raise RBFatalError if given username does not exist in user
database."""
if not self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL, 'uid=%s' % uid):
raise RBFatalError("User '%s' does not exist" % uid)
def check_user_byid(self, user_id):
"""Raise RBFatalError if given id does not belong to a user in
user database."""
if not self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL, 'id=%s' % user_id):
raise RBFatalError("User with id '%s' does not exist" % user_id)
def check_group_byname(self, group):
"""Raise RBFatalError if given group does not exist in group
database."""
if not self.ldap.search_s(rbconfig.ldap_group_tree,
ldap.SCOPE_ONELEVEL, 'cn=%s' % group):
raise RBFatalError("Group '%s' does not exist" % group)
def check_group_byid(self, gid):
"""Raise RBFatalError if given id does not belong to a group in
group database."""
if not self.ldap.search_s(rbconfig.ldap_group_tree,
ldap.SCOPE_ONELEVEL, 'gidNumber=%s' % gid):
raise RBFatalError("Group with id '%s' does not exist" % gid)
# ------------------------------------------------------------------- #
# INFORMATION RETRIEVAL METHODS #
# ------------------------------------------------------------------- #
# fixme still needed ?
# def get_usertype_byname(self, uid):
# """Return usertype for username in user database. Raise
# RBFatalError if user does not exist."""
# res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
# ldap.SCOPE_ONELEVEL, 'uid=%s' % usr.uid,
# ('objectClass', ))
# if res:
# for i in res[0][1]['objectClass']:
# if i in rbconfig.usertypes:
# return i
# else:
# raise RBFatalError("Unknown usertype for user '%s'" % uid)
# else:
# raise RBFatalError("User '%s' does not exist" % uid)
def get_user_byid(self, usr):
"""Populate RBUser object with data from user with given id in
user database. Raise RBFatalError if user does not exist."""
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL, 'id=%s' % usr.id)
if res:
self.set_user(usr, res[0])
else:
raise RBFatalError("User with id '%s' does not exist" % usr.id)
def get_dummyid(self, usr):
"""Set usr.id to unique 'dummy' DCU ID number."""
raise RBFatalError('NOT YET IMPLEMENTED')
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL,
'(&(id>=10000000)(id<20000000))"' % (usr.uid))
if res:
usr.id = int(res[0][1]['id'][0]) + 1
else:
usr.id = 10000000
def get_gid_byname(self, group):
"""Get gid for given group name.
Raise RBFatalError if given name does not belong to a group in
group database."""
res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
'cn=%s' % group)
if res:
return int(res[0][1]['gidNumber'][0])
else:
raise RBFatalError("Group '%s' does not exist" % group)
def get_group_byid(self, gid):
"""Get group name for given group ID.
Raise RBFatalError if given id does not belong to a group in
group database."""
res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
'gidNumber=%s' % gid)
if res:
return res[0][1]['cn'][0]
else:
raise RBFatalError("Group with id '%s' does not exist" % gid)
def list_paid_newbies(self):
"""Return list of all paid newbie usernames."""
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL,
'(&(yearsPaid>=1)(newbie=TRUE))', ('uid', ))
return [data['uid'][0] for dn, data in res]
def list_paid_non_newbies(self):
"""Return list of all paid renewal (non-newbie) usernames."""
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL,
'(&(yearsPaid>=1)(newbie=FALSE))', ('uid', ))
return [data['uid'][0] for dn, data in res]
def list_non_newbies(self):
"""Return list of all non newbie usernames."""
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL, 'newbie=FALSE',
('uid', ))
return [data['uid'][0] for dn, data in res]
def list_newbies(self):
"""Return list of all newbie usernames."""
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL, 'newbie=TRUE', ('uid', ))
return [data['uid'][0] for dn, data in res]
def list_groups(self):
"""Return list of all groups."""
res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
'objectClass=posixGroup', ('cn', ))
return [data['cn'][0] for dn, data in res]
def list_reserved_static(self):
"""Return list of all static reserved names."""
res = self.ldap.search_s(
rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL,
'(&(objectClass=reserved)(flag=static))', ('uid', ))
return [data['uid'][0] for dn, data in res]
def list_reserved_dynamic(self):
"""Return list of all dynamic reserved names."""
res = self.ldap.search_s(
rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL,
'(&(objectClass=reserved)(!(flag=static)))', ('uid', ))
return [data['uid'][0] for dn, data in res]
def list_unpaid(self):
"""Return list of all non-renewed users."""
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL, 'yearsPaid<=0',
('uid', ))
return [data['uid'][0] for dn, data in res]
def list_unpaid_normal(self):
"""Return list of all normal non-renewed users."""
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL, 'yearsPaid=0', ('uid', ))
return [data['uid'][0] for dn, data in res]
def list_unpaid_grace(self):
"""Return list of all grace non-renewed users."""
res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
ldap.SCOPE_ONELEVEL, 'yearsPaid<=-1',
('uid', ))
return [data['uid'][0] for dn, data in res]
def dict_reserved_desc(self):
"""Return dictionary of all reserved entries with their
description."""
res = self.ldap.search_s(rbconfig.ldap_reserved_tree,
ldap.SCOPE_ONELEVEL, 'objectClass=reserved',
('uid', 'description'))
tmp = {}
for data in res:
tmp[data['uid'][0]] = data['description'][0]
return tmp
def dict_reserved_static(self):
"""Return dictionary of all static reserved entries with their
description."""
res = self.ldap.search_s(
rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL,
'(&(objectClass=reserved)(flag=static))', ('uid', 'description'))
tmp = {}
for data in res:
tmp[data['uid'][0]] = data['description'][0]
return tmp
# -------------------------------- #
# METHODS RETURNING SEARCH RESULTS #
# -------------------------------- #
def execute(self):
args = self.args
# we can connect in 2 ways. By hostname/ip (and portnumber)
# or by ldap-uri
if "url" in args and ldapurl.isLDAPUrl(args["url"]):
conn = ldap.initialize(args["url"])
else:
ip, port = self.get_address()
conn = ldap.initialize("ldap://%s:%s" % (ip, port))
username = args.get("username", "")
password = args.get("password", "")
conn.simple_bind(username, password)
try:
self._set_version(args, conn)
except ValueError:
return Event.DOWN, "unsupported protocol version"
base = args.get("base", "dc=example,dc=org")
if base == "cn=monitor":
my_res = conn.search_st(base, ldap.SCOPE_BASE,
timeout=self.timeout)
versionstr = str(my_res[0][-1]['description'][0])
self.version = versionstr
return Event.UP, versionstr
scope = args.get("scope", "SUBTREE").upper()
if scope == "BASE":
scope = ldap.SCOPE_BASE
elif scope == "ONELEVEL":
scope = ldap.SCOPE_ONELEVEL
else:
scope = ldap.SCOPE_SUBTREE
filtr = args.get("filter", "objectClass=*")
try:
conn.search_ext_s(base, scope, filterstr=filtr,
timeout=self.timeout)
# pylint: disable=W0703
except Exception as err:
return (Event.DOWN,
"Failed ldapSearch on %s for %s: %s" % (
self.get_address(), filtr, str(err)))
conn.unbind()
return Event.UP, "Ok"