def read_subschemasubentry_s(self,subschemasubentry_dn,attrs=None):
"""
Returns the sub schema sub entry's data
"""
attrs = attrs or SCHEMA_ATTRS
try:
r = self.search_s(
subschemasubentry_dn,ldap.SCOPE_BASE,
'(objectClass=subschema)',
attrs
)
except ldap.NO_SUCH_OBJECT:
return None
else:
if r:
return r[0][1]
else:
return None
python类NO_SUCH_OBJECT的实例源码
def read_subschemasubentry_s(self,subschemasubentry_dn,attrs=None):
"""
Returns the sub schema sub entry's data
"""
attrs = attrs or SCHEMA_ATTRS
try:
r = self.search_s(
subschemasubentry_dn,ldap.SCOPE_BASE,
'(objectClass=subschema)',
attrs
)
except ldap.NO_SUCH_OBJECT:
return None
else:
if r:
return r[0][1]
else:
return None
def __ldap_load_members2(self, l, moderator=False):
attr = ['mail']
if not moderator and self.ldapfullname:
attr.append(self.ldapfullname)
if self.ldapgroupattr or moderator:
# group attribute or moderator flag has been set. Let's get the uids.
if moderator:
assert self.ldapmodgroupdn
groupdn = self.ldapmodgroupdn
else:
groupdn = self.ldapgroupdn
members = l.search_s(groupdn, ldap.SCOPE_SUBTREE,
self.ldapsearch, [self.ldapgroupattr])
for (dn,attrs) in members:
if self.ldapgroupattr in attrs:
memberids = attrs[self.ldapgroupattr]
if DEBUG:
syslog('debug','regular groupdns = %s' % groupdns)
for memberid in memberids:
try:
res2 = l.search_s(self.ldapbasedn,
ldap.SCOPE_SUBTREE,
'(&(objectClass=*)('+self.ldapmemberuid+'='+memberid+'))',
attr)
self.__loadmembers(res2, moderator)
except ldap.NO_SUCH_OBJECT:
syslog('warn',"can't process %s: no such object (accountDisabled?)" % memberid)
else:
members = l.search_s(self.ldapbasedn,
ldap.SCOPE_SUBTREE,
self.ldapsearch,
attr)
self.__loadmembers(members)
def search_subschemasubentry_s(self,dn=''):
"""
Returns the distinguished name of the sub schema sub entry
for a part of a DIT specified by dn.
None as result indicates that the DN of the sub schema sub entry could
not be determined.
"""
try:
r = self.search_s(
dn,ldap.SCOPE_BASE,'(objectClass=*)',['subschemaSubentry']
)
except (ldap.NO_SUCH_OBJECT,ldap.NO_SUCH_ATTRIBUTE,ldap.INSUFFICIENT_ACCESS):
r = []
except ldap.UNDEFINED_TYPE:
return None
try:
if r:
e = ldap.cidict.cidict(r[0][1])
search_subschemasubentry_dn = e.get('subschemaSubentry',[None])[0]
if search_subschemasubentry_dn is None:
if dn:
# Try to find sub schema sub entry in root DSE
return self.search_subschemasubentry_s(dn='')
else:
# If dn was already root DSE we can return here
return None
else:
return search_subschemasubentry_dn
except IndexError:
return None
def search_subschemasubentry_s(self,dn=''):
"""
Returns the distinguished name of the sub schema sub entry
for a part of a DIT specified by dn.
None as result indicates that the DN of the sub schema sub entry could
not be determined.
"""
try:
r = self.search_s(
dn,ldap.SCOPE_BASE,'(objectClass=*)',['subschemaSubentry']
)
except (ldap.NO_SUCH_OBJECT,ldap.NO_SUCH_ATTRIBUTE,ldap.INSUFFICIENT_ACCESS):
r = []
except ldap.UNDEFINED_TYPE:
return None
try:
if r:
e = ldap.cidict.cidict(r[0][1])
search_subschemasubentry_dn = e.get('subschemaSubentry',[None])[0]
if search_subschemasubentry_dn is None:
if dn:
# Try to find sub schema sub entry in root DSE
return self.search_subschemasubentry_s(dn='')
else:
# If dn was already root DSE we can return here
return None
else:
return search_subschemasubentry_dn
except IndexError:
return None
def search_subschemasubentry_s(self,dn=''):
"""
Returns the distinguished name of the sub schema sub entry
for a part of a DIT specified by dn.
None as result indicates that the DN of the sub schema sub entry could
not be determined.
"""
try:
r = self.search_s(
dn,ldap.SCOPE_BASE,'(objectClass=*)',['subschemaSubentry']
)
except (ldap.NO_SUCH_OBJECT,ldap.NO_SUCH_ATTRIBUTE,ldap.INSUFFICIENT_ACCESS):
r = []
except ldap.UNDEFINED_TYPE:
return None
try:
if r:
e = ldap.cidict.cidict(r[0][1])
search_subschemasubentry_dn = e.get('subschemaSubentry',[None])[0]
if search_subschemasubentry_dn is None:
if dn:
# Try to find sub schema sub entry in root DSE
return self.search_subschemasubentry_s(dn='')
else:
# If dn was already root DSE we can return here
return None
else:
return search_subschemasubentry_dn
except IndexError:
return None
def read_subschemasubentry_s(self,subschemasubentry_dn,attrs=None):
"""
Returns the sub schema sub entry's data
"""
try:
subschemasubentry = self.read_s(
subschemasubentry_dn,
filterstr='(objectClass=subschema)',
attrlist=attrs or SCHEMA_ATTRS
)
except ldap.NO_SUCH_OBJECT:
return None
else:
return subschemasubentry
def test_create_object(self):
ldap_object = pyldap_orm.LDAPObject(self.session)
ldap_object.dn = 'cn=Test,ou=Tests,dc=example,dc=com'
ldap_object.objectClass = ['person']
ldap_object.sn = ['Test']
ldap_object.save()
ldap_object = pyldap_orm.LDAPObject(self.session).by_dn('cn=Test,ou=Tests,dc=example,dc=com')
assert ldap_object.dn == 'cn=Test,ou=Tests,dc=example,dc=com'
assert ldap_object.sn == ['Test']
ldap_object.delete()
with pytest.raises(ldap.NO_SUCH_OBJECT):
pyldap_orm.LDAPObject(self.session).by_dn('cn=Test,ou=Tests,dc=example,dc=com')
def _count_certificates(self):
self._log.debug('Counting certificates...')
try:
results = self._search(
'ou=certificateRepository,ou=ca,o=ipaca',
'(certStatus=*)',
scope=ldap.SCOPE_ONELEVEL
)
except ldap.NO_SUCH_OBJECT:
return 'N/A'
n = len(results)
return n
def __init__(self, fqdn, binddn, bindpw):
self._log = logging.getLogger()
self._log.debug('Initialising FreeIPA server %s' % fqdn)
self.fqdn = fqdn
self.hostname_short = fqdn.partition('.')[0]
self._domain = fqdn.partition('.')[2]
self._binddn = binddn
self._bindpw = bindpw
self._url = 'ldaps://' + fqdn
self._base_dn = 'dc=' + fqdn.partition('.')[2].replace('.', ',dc=')
self._active_user_base = 'cn=users,cn=accounts,' + self._base_dn
self._stage_user_base = 'cn=staged users,cn=accounts,cn=provisioning,' + self._base_dn
self._preserved_user_base = 'cn=deleted users,cn=accounts,cn=provisioning,' + self._base_dn
self._groups_base = 'cn=groups,cn=accounts,' + self._base_dn
try:
self._conn = ldap.initialize(self._url)
self._conn.set_option(ldap.OPT_NETWORK_TIMEOUT, 3)
self._conn.simple_bind_s(self._binddn, self._bindpw)
except (
ldap.SERVER_DOWN,
ldap.NO_SUCH_OBJECT,
ldap.INVALID_CREDENTIALS
) as err:
self._log.critical('Bind error: %s (%s)' % (err.message['desc'], self.fqdn))
exit(1)
self.users = self._count_users(user_base='active')
self.ustage = self._count_users(user_base='stage')
self.upres = self._count_users(user_base='preserved')
self.ugroups = self._count_groups()
self.hosts = self._count_hosts()
self.hgroups = self._count_hostgroups()
self.hbac = self._count_hbac_rules()
self.sudo = self._count_sudo_rules()
self.zones = self._count_dns_zones()
self.certs = self._count_certificates()
self.ldap = self._ldap_conflicts()
self.ghosts = self._ghost_replicas()
self.bind = self._anon_bind()
self.msdcs = self._ms_adtrust()
self.replica, self.healthy_agreements = self._replication_agreements()