def exact(self):
try:
results = self.connection.search_s(
self.dn, ldap.SCOPE_BASE, attrlist=[self.name])
except ldap.LDAPError:
e = get_exception()
self.module.fail_json(
msg="Cannot search for attribute %s" % self.name,
details=str(e))
current = results[0][1].get(self.name, [])
modlist = []
if frozenset(self.values) != frozenset(current):
if len(current) == 0:
modlist = [(ldap.MOD_ADD, self.name, self.values)]
elif len(self.values) == 0:
modlist = [(ldap.MOD_DELETE, self.name, None)]
else:
modlist = [(ldap.MOD_REPLACE, self.name, self.values)]
return modlist
python类LDAPError()的实例源码
def auth(self, username, password):
"""
Authenticate a user against the ldap server
"""
dn = 'uid={0},{1}'.format(username, self.base_dn)
try:
with self._ldap_connection() as ldap_cxn:
ldap_cxn.simple_bind_s(dn, password)
return True
except ldap.INVALID_CREDENTIALS:
raise
except ldap.INVALID_DN_SYNTAX:
self.bus.log('Invalid DN syntax in configuration: {0}'.format(self.base_dn), 40)
raise
except ldap.LDAPError as e:
self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
level=40,
traceback=True)
raise
def get_user_by_uid(self, uid):
"""
Lookup an ldap user by uid and return a dict with
all known and anonymously accessible attributes.
"""
searchfilter = '(uid={0})'.format(uid)
try:
user = self._search(searchfilter)
if not user or len(user) > 1:
return {}
else:
return user[0][1]
except ldap.LDAPError as e:
self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
level=40,
traceback=True)
raise
def get_user_by_email(self, email_address):
"""
Lookup an ldap user by email address and return a dict with
all known and anonymously accessible attributes.
"""
searchfilter = '(mail={0})'.format(email_address)
try:
user = self._search(searchfilter)
if not user or len(user) > 1:
return {}
else:
return user[0][1]
except ldap.LDAPError as e:
self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
level=40,
traceback=True)
raise
def delete_sshpubkey(self, username, sshpubkey):
"""
Add an sshPublicKey attribute to the user's dn
"""
dn = 'uid={0},{1}'.format(username, self.base_dn)
try:
with self._ldap_connection() as ldap_cxn:
ldap_cxn.simple_bind_s(self.bind_dn, self.bind_pw)
mod_list = [(ldap.MOD_DELETE, 'sshPublicKey', str(sshpubkey))]
ldap_cxn.modify_s(dn, mod_list)
except (ldap.INVALID_CREDENTIALS, ldap.INSUFFICIENT_ACCESS, ldap.LDAPError) as e:
self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
level=40,
traceback=True)
raise
def getusercredbyuid(self, username):
baseDN = "ou=People,"+self.dc
searchScope = ldap.SCOPE_SUBTREE
searchFilter = "uid="+username
searchAttrList = ["givenName","sn"]
try:
# Result is a list of touple (one for each match) of dn and attrs.
# Attrs is a dict, each value is a list of string.
result = self.conn.search_s(baseDN, searchScope, searchFilter, searchAttrList)
if ( result==[] ):
print("User %s does not exist!", username)
return
result = [value[0] for value in result[0][1].values()]
# Returns 2-element list, name and surname
return result
except ldap.LDAPError as e:
print(e)
return
def get_real_name(self):
"""
Attempt to retrieve the LDAP Common Name of the given login name.
"""
encoding = _config.get('ldap', 'encoding')
user_dn = self.get_user_dn().encode(encoding)
name_attr = _config.get('ldap', 'name_attr')
try:
res = self.ldap.search_s(user_dn, ldap.SCOPE_BASE,
'(objectClass=*)', [name_attr])
except ldap.LDAPError:
_logger.exception("Caught exception while retrieving user name "
"from LDAP, returning None as name")
return None
# Just look at the first result record, since we are searching for
# a specific user
record = res[0][1]
name = record[name_attr][0]
return name
def __init__(self,ldap_host=None,base_dn=None,user=None,password=None):
if not ldap_host:
ldap_host = LDAP_HOST
if not base_dn:
self.base_dn = BASE_DN
if not user:
user = USER
if not password:
password = PASSWORD
try:
self.ldapconn = ldap.open(ldap_host)
self.ldapconn.simple_bind(user,password)
except ldap.LDAPError,e:
print e
#?????????????????dn,??dn??????????????
#?ldap???cn=username,ou=users,dc=gccmx,dc=cn,??????????????DN
def ldap_search_dn(self,uid=None):
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = None
searchFilter = "cn=" + uid
try:
ldap_result_id = obj.search(self.base_dn, searchScope, searchFilter, retrieveAttributes)
result_type, result_data = obj.result(ldap_result_id, 0)
#??????
#('cn=django,ou=users,dc=gccmx,dc=cn',
# { 'objectClass': ['inetOrgPerson', 'top'],
# 'userPassword': ['{MD5}lueSGJZetyySpUndWjMBEg=='],
# 'cn': ['django'], 'sn': ['django'] } )
#
if result_type == ldap.RES_SEARCH_ENTRY:
#dn = result[0][0]
return result_data[0][0]
else:
return None
except ldap.LDAPError, e:
print e
#??????????????
def ldap_get_user(self,uid=None):
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = None
searchFilter = "cn=" + uid
try:
ldap_result_id = obj.search(self.base_dn, searchScope, searchFilter, retrieveAttributes)
result_type, result_data = obj.result(ldap_result_id, 0)
if result_type == ldap.RES_SEARCH_ENTRY:
username = result_data[0][1]['cn'][0]
email = result_data[0][1]['mail'][0]
nick = result_data[0][1]['sn'][0]
result = {'username':username,'email':email,'nick':nick}
return result
else:
return None
except ldap.LDAPError, e:
print e
#????????????????????LDAP???boolean?
def authenticate(username, password):
records = get_user_records(username)
dila_permission = check_group_membership(username)
if records and dila_permission:
user_dn, user_attributes = records[0]
with initialize_connection() as connection:
try:
connection.simple_bind_s(user_dn, password)
except ldap.LDAPError:
return ANONYMOUS_USER
else:
encoding = config.LDAP_ENCODING
first_name = user_attributes.get(config.LDAP_USER_ATTRIBUTE_MAP['first_name'])[0].decode(encoding)
last_name = user_attributes.get(config.LDAP_USER_ATTRIBUTE_MAP['last_name'])[0].decode(encoding)
is_superuser = check_group_membership(username, config.LDAP_SUPERUSER_GROUP_CN)
return structures.User(
authenticated=True,
username=username,
first_name=first_name,
last_name=last_name,
is_superuser=is_superuser
)
else:
return ANONYMOUS_USER
def ldap_initialize(module, server):
ldapmodule_trace_level = 1
ldapmodule_trace_file = sys.stderr
ldap._trace_level = ldapmodule_trace_level
try:
conn = ldap.initialize(
server,
trace_level=ldapmodule_trace_level,
trace_file=ldapmodule_trace_file
)
except ldap.LDAPError as e:
fail_msg = "LDAP Error initializing: {}".format(ldap_errors(e))
module.fail_json(msg=fail_msg)
return conn
def ldap_bind_with_user(module, conn, username, password):
result = False
try:
conn.simple_bind_s(username, password)
result = True
except ldap.INVALID_CREDENTIALS:
fail_msg = "Invalid Credentials for user {}".format(username)
module.fail_json(msg=fail_msg)
except ldap.LDAPError as e:
fail_msg = "LDAP Error Binding user: {}: ERROR: {}".format(username, ldap_errors(e))
module.fail_json(msg=fail_msg)
return result
def getDefaultNamingContext(self):
try:
newCon = ldap.initialize('ldap://{}'.format(self.dc_ip))
newCon.simple_bind_s('','')
res = newCon.search_s("", ldap.SCOPE_BASE, '(objectClass=*)')
rootDSE = res[0][1]
except ldap.LDAPError, e:
print "[!] Error retrieving the root DSE"
print "[!] {}".format(e)
sys.exit(1)
if not rootDSE.has_key('defaultNamingContext'):
print "[!] No defaultNamingContext found!"
sys.exit(1)
defaultNamingContext = rootDSE['defaultNamingContext'][0]
self.domainBase = defaultNamingContext
newCon.unbind()
return defaultNamingContext
def _connect_to_ldap(self):
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
connection = ldap.initialize(self.server_uri)
if self.start_tls:
try:
connection.start_tls_s()
except ldap.LDAPError:
e = get_exception()
self.module.fail_json(msg="Cannot start TLS.", details=str(e))
try:
if self.bind_dn is not None:
connection.simple_bind_s(self.bind_dn, self.bind_pw)
else:
connection.sasl_interactive_bind_s('', ldap.sasl.external())
except ldap.LDAPError:
e = get_exception()
self.module.fail_json(
msg="Cannot bind to the server.", details=str(e))
return connection
def getMemberName(self, member):
self.__assertIsMember(member)
try:
return self.__ldap_mail_to_cn(member)
except ldap.LDAPError:
raise NotAMemberError
def set_password(self, username, hashes):
"""
Administratively set the user's password using the given hashes.
"""
dn = 'uid={0},{1}'.format(username, self.base_dn)
try:
with self._ldap_connection() as ldap_cxn:
ldap_cxn.simple_bind_s(self.bind_dn, self.bind_pw)
mod_nt = (ldap.MOD_REPLACE, 'sambaNTPassword', hashes['sambaNTPassword'])
mod_ssha = (ldap.MOD_REPLACE, 'userPassword', hashes['userPassword'])
mod_list = [mod_nt, mod_ssha]
ldap_cxn.modify_s(dn, mod_list)
except ldap.INVALID_CREDENTIALS:
self.bus.log('Invalid credentials for admin user: {0}'.format(self.bind_dn), 40)
raise
except ldap.INSUFFICIENT_ACCESS:
self.bus.log('Insufficient access for admin user: {0}'.format(self.bind_dn), 40)
raise
except ldap.INVALID_DN_SYNTAX:
self.bus.log('Invalid DN syntax in configuration: {0}'.format(self.base_dn), 40)
raise
except ldap.LDAPError as e:
self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
level=40,
traceback=True)
raise
def change_password(self, username, oldpassword, hashes):
"""
Change the user's password using their own credentials.
"""
dn = 'uid={0},{1}'.format(username, self.base_dn)
try:
with self._ldap_connection() as ldap_cxn:
ldap_cxn.simple_bind_s(dn, oldpassword)
# don't use LDAPObject.passwd_s() here to make use of
# ldap's atomic operations. IOW, don't change one password
# but not the other.
mod_nt = (ldap.MOD_REPLACE, 'sambaNTPassword', hashes['sambaNTPassword'])
mod_ssha = (ldap.MOD_REPLACE, 'userPassword', hashes['userPassword'])
mod_list = [mod_nt, mod_ssha]
ldap_cxn.modify_s(dn, mod_list)
except ldap.INVALID_CREDENTIALS:
raise
except ldap.INVALID_DN_SYNTAX:
self.bus.log('Invalid DN syntax in configuration: {0}'.format(self.base_dn), 40)
raise
except ldap.LDAPError as e:
self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
level=40,
traceback=True)
raise
def __init__(self, uri, cn, dc, secret):
self.conn = None
self.uri = uri
self.dc = dc
self.secret = secret
try:
self.conn = ldap.initialize(self.uri)
self.conn.protocol_version = ldap.VERSION3
self.conn.simple_bind_s(cn+","+self.dc,self.secret)
print("Connection established.")
except ldap.INVALID_CREDENTIALS:
print("Your username or password is incorrect.")
sys.exit()
except ldap.LDAPError as e:
if type(e.message) == dict and e.message.has_key('desc'):
print(e.message['desc'])
else: print(e)
sys.exit()
def userexistsbyuid(self, username):
baseDN = "ou=People,"+self.dc
searchScope = ldap.SCOPE_SUBTREE
searchFilter = "uid="+username
try:
result = self.conn.search_s(baseDN, searchScope, searchFilter, None)
if ( result==[] ):
return False
else:
return True
except ldap.LDAPError as e:
print(e)
return False
def changepwd(self, username, newsecret):
if (not self.userexistsbyuid(username) ):
print("User %s does not exist!", username)
return
dn = "uid="+username+",ou=People,"+self.dc
try:
self.conn.passwd( dn, None, newsecret )
except ldap.LDAPError as e:
print("Error: Can\'t change %s password: %s" % (username, e.message['desc']))
def changeshadowexpire(self, username, shexp):
if (not self.userexistsbyuid(username)):
print("User %s does not exist!", username)
return
dn = "uid="+username+",ou=People,"+self.dc
ldif = [( ldap.MOD_REPLACE, 'shadowExpire', shexp )]
try:
self.conn.modify_s(dn, ldif)
except ldap.LDAPError as e:
print("Error: Can\'t change %s shadowExpire: %s" % (username, e.message['desc']))
def _ldap_function_call(func,*args,**kwargs):
"""
Wrapper function which locks calls to func with via
module-wide ldap_lock
"""
if __debug__:
if ldap._trace_level>=1:
ldap._trace_file.write('*** %s.%s (%s,%s)\n' % (
'_ldap',repr(func),
repr(args),repr(kwargs)
))
if ldap._trace_level>=3:
traceback.print_stack(limit=ldap._trace_stack_limit,file=ldap._trace_file)
ldap._ldap_module_lock.acquire()
try:
try:
result = func(*args,**kwargs)
finally:
ldap._ldap_module_lock.release()
except LDAPError,e:
if __debug__ and ldap._trace_level>=2:
ldap._trace_file.write('=> LDAPError: %s\n' % (str(e)))
raise
if __debug__ and ldap._trace_level>=2:
if result!=None and result!=(None,None):
ldap._trace_file.write('=> result: %s\n' % (repr(result)))
return result
def _ldap_call(self,func,*args,**kwargs):
"""
Wrapper method mainly for serializing calls into OpenLDAP libs
and trace logs
"""
if __debug__:
if self._trace_level>=1:# and func.__name__!='result':
self._trace_file.write('*** %s - %s (%s,%s)\n' % (
self._uri,
self.__class__.__name__+'.'+func.__name__,
repr(args),repr(kwargs)
))
if self._trace_level>=3:
traceback.print_stack(limit=self._trace_stack_limit,file=self._trace_file)
self._ldap_object_lock.acquire()
try:
try:
result = func(*args,**kwargs)
if __debug__ and self._trace_level>=2:
if func.__name__!="unbind_ext":
diagnostic_message_success = self._l.get_option(ldap.OPT_DIAGNOSTIC_MESSAGE)
else:
diagnostic_message_success = None
finally:
self._ldap_object_lock.release()
except LDAPError,e:
if __debug__ and self._trace_level>=2:
self._trace_file.write('=> LDAPError - %s: %s\n' % (e.__class__.__name__,str(e)))
raise
else:
if __debug__ and self._trace_level>=2:
if not diagnostic_message_success is None:
self._trace_file.write('=> diagnosticMessage: %s\n' % (repr(diagnostic_message_success)))
if result!=None and result!=(None,None):
self._trace_file.write('=> result: %s\n' % (repr(result)))
return result
def _ldap_function_call(func,*args,**kwargs):
"""
Wrapper function which locks calls to func with via
module-wide ldap_lock
"""
if __debug__:
if ldap._trace_level>=1:
ldap._trace_file.write('*** %s.%s (%s,%s)\n' % (
'_ldap',repr(func),
repr(args),repr(kwargs)
))
if ldap._trace_level>=3:
traceback.print_stack(limit=ldap._trace_stack_limit,file=ldap._trace_file)
ldap._ldap_module_lock.acquire()
try:
try:
result = func(*args,**kwargs)
finally:
ldap._ldap_module_lock.release()
except LDAPError,e:
if __debug__ and ldap._trace_level>=2:
ldap._trace_file.write('=> LDAPError: %s\n' % (str(e)))
raise
if __debug__ and ldap._trace_level>=2:
if result!=None and result!=(None,None):
ldap._trace_file.write('=> result: %s\n' % (repr(result)))
return result
def _ldap_call(self,func,*args,**kwargs):
"""
Wrapper method mainly for serializing calls into OpenLDAP libs
and trace logs
"""
if __debug__:
if self._trace_level>=1:# and func.__name__!='result':
self._trace_file.write('*** %s - %s (%s,%s)\n' % (
self._uri,
self.__class__.__name__+'.'+func.__name__,
repr(args),repr(kwargs)
))
if self._trace_level>=3:
traceback.print_stack(limit=self._trace_stack_limit,file=self._trace_file)
self._ldap_object_lock.acquire()
try:
try:
result = func(*args,**kwargs)
if __debug__ and self._trace_level>=2:
if func.__name__!="unbind_ext":
diagnostic_message_success = self._l.get_option(ldap.OPT_DIAGNOSTIC_MESSAGE)
else:
diagnostic_message_success = None
finally:
self._ldap_object_lock.release()
except LDAPError,e:
if __debug__ and self._trace_level>=2:
self._trace_file.write('=> LDAPError - %s: %s\n' % (e.__class__.__name__,str(e)))
raise
else:
if __debug__ and self._trace_level>=2:
if not diagnostic_message_success is None:
self._trace_file.write('=> diagnosticMessage: %s\n' % (repr(diagnostic_message_success)))
if result!=None and result!=(None,None):
self._trace_file.write('=> result: %s\n' % (repr(result)))
return result
def _ldap_function_call(lock,func,*args,**kwargs):
"""
Wrapper function which locks and logs calls to function
lock
Instance of threading.Lock or compatible
func
Function to call with arguments passed in via *args and **kwargs
"""
if lock:
lock.acquire()
if __debug__:
if ldap._trace_level>=1:
ldap._trace_file.write('*** %s.%s %s\n' % (
'_ldap',func.__name__,
pprint.pformat((args,kwargs))
))
if ldap._trace_level>=9:
traceback.print_stack(limit=ldap._trace_stack_limit,file=ldap._trace_file)
try:
try:
result = func(*args,**kwargs)
finally:
if lock:
lock.release()
except LDAPError as e:
if __debug__ and ldap._trace_level>=2:
ldap._trace_file.write('=> LDAPError: %s\n' % (str(e)))
raise
if __debug__ and ldap._trace_level>=2:
ldap._trace_file.write('=> result:\n%s\n' % (pprint.pformat(result)))
return result
def _ldap_call(self,func,*args,**kwargs):
"""
Wrapper method mainly for serializing calls into OpenLDAP libs
and trace logs
"""
self._ldap_object_lock.acquire()
if __debug__:
if self._trace_level>=1:
self._trace_file.write('*** %s %s - %s\n%s\n' % (
repr(self),
self._uri,
'.'.join((self.__class__.__name__,func.__name__)),
pprint.pformat((args,kwargs))
))
if self._trace_level>=9:
traceback.print_stack(limit=self._trace_stack_limit,file=self._trace_file)
diagnostic_message_success = None
try:
try:
result = func(*args,**kwargs)
if __debug__ and self._trace_level>=2:
if func.__name__!="unbind_ext":
diagnostic_message_success = self._l.get_option(ldap.OPT_DIAGNOSTIC_MESSAGE)
finally:
self._ldap_object_lock.release()
except LDAPError as e:
if __debug__ and self._trace_level>=2:
self._trace_file.write('=> LDAPError - %s: %s\n' % (e.__class__.__name__,str(e)))
raise
else:
if __debug__ and self._trace_level>=2:
if not diagnostic_message_success is None:
self._trace_file.write('=> diagnosticMessage: %s\n' % (repr(diagnostic_message_success)))
self._trace_file.write('=> result:\n%s\n' % (pprint.pformat(result)))
return result
def ldap_get_vaild(self,uid=None,passwd=None):
obj = self.ldapconn
target_cn = self.ldap_search_dn(uid)
try:
if obj.simple_bind_s(target_cn,passwd):
return True
else:
return False
except ldap.LDAPError,e:
print e
#??????
def ldap_update_pass(self,uid=None,oldpass=None,newpass=None):
modify_entry = [(ldap.MOD_REPLACE,'userpassword',newpass)]
obj = self.ldapconn
target_cn = self.ldap_search_dn(uid)
try:
obj.simple_bind_s(target_cn,oldpass)
obj.passwd_s(target_cn,oldpass,newpass)
return True
except ldap.LDAPError,e:
return False