def ldap_search(module, conn, dn, search_filter, ldap_attrs):
try:
search = conn.search_s(dn, ldap.SCOPE_SUBTREE, search_filter, ldap_attrs)
except ldap.LDAPError as e:
fail_msg = "LDAP Error Searching: {}".format(ldap_errors(e))
module.fail_json(msg=fail_msg)
return search
python类LDAPError()的实例源码
def ldap_unbind(module, conn):
result = False
try:
conn.unbind_s()
result = True
except ldap.LDAPError as e:
fail_msg = "LDAP Error unbinding: {}".format(e)
module.fail_json(msg=fail_msg)
return result
def do_bind(self):
try:
self.con.simple_bind_s(self.username, self.password)
self.is_binded = True
return True
except ldap.INVALID_CREDENTIALS:
print "[!] Error: invalid credentials"
sys.exit(1)
except ldap.LDAPError, e:
print "[!] {}".format(e)
sys.exit(1)
def whoami(self):
try:
current_dn = self.con.whoami_s()
except ldap.LDAPError, e:
print "[!] {}".format(e)
sys.exit(1)
return current_dn
def getAllUsers(self, attrs=''):
if not attrs:
attrs = ['cn', 'userPrincipalName']
objectFilter = '(objectCategory=user)'
base_dn = self.domainBase
try:
rawUsers = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
except LDAPError, e:
print "[!] Error retrieving users"
print "[!] {}".format(e)
sys.exit(1)
return (self.get_search_results(rawUsers), attrs)
def getAllGroups(self,attrs=''):
if not attrs:
attrs = ['distinguishedName', 'cn']
objectFilter = '(objectCategory=group)'
base_dn = self.domainBase
try:
rawGroups = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
except LDAPError, e:
print "[!] Error retrieving groups"
print "[!] {}".format(e)
sys.exit(1)
return (self.get_search_results(rawGroups), attrs)
def doCustomSearch(self, base, objectFilter, attrs):
try:
rawResults = self.do_ldap_query(base, ldap.SCOPE_SUBTREE, objectFilter, attrs)
except LDAPError, e:
"print [!] Error doing search"
"print [!] {}".format(e)
sys.exit(1)
return self.get_search_results(rawResults)
def getAllComputers(self, attrs=''):
if not attrs:
attrs = ['cn', 'dNSHostName', 'operatingSystem', 'operatingSystemVersion', 'operatingSystemServicePack']
objectFilter = '(objectClass=Computer)'
base_dn = self.domainBase
try:
rawComputers = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
except LDAPError, e:
print "[!] Error retrieving computers"
print "[!] {}".format(e)
sys.exit(1)
return (self.get_search_results(rawComputers), attrs)
def ldap_search(self, filter, attributes, incremental, incremental_filter):
"""
Query the configured LDAP server with the provided search filter and
attribute list.
"""
for uri in self.conf_LDAP_SYNC_BIND_URI:
#Read record of this uri
if (self.working_uri == uri):
adldap_sync = self.working_adldap_sync
created = False
else:
adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri)
if ((adldap_sync.syncs_to_full > 0) and incremental):
filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT))
logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use)
else:
filter_to_use = filter
ldap.set_option(ldap.OPT_REFERRALS, 0)
#ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
l = PagedLDAPObject(uri)
l.protocol_version = 3
if (uri.startswith('ldaps:')):
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_DEMAND, True)
else:
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER)
l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
l.set_option(ldap.OPT_X_TLS_DEMAND, False)
try:
l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS)
except ldap.LDAPError as e:
logger.error("Error connecting to LDAP server %s : %s" % (uri, e))
continue
results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None)
l.unbind_s()
if (self.working_uri is None):
self.working_uri = uri
self.conf_LDAP_SYNC_BIND_URI.insert(0, uri)
self.working_adldap_sync = adldap_sync
return (uri, results) # Return both the LDAP server URI used and the request. This is for incremental sync purposes
#if not connected correctly, raise error
raise
def authenticate(login, password):
"""
Attempt to authenticate the login name with password against the
configured LDAP server. If the user is authenticated, required
group memberships are also verified.
"""
lconn = open_ldap()
server = _config.get('ldap', 'server')
user = LDAPUser(login, lconn)
# Bind to user using the supplied password
try:
user.bind(password)
except (ldap.SERVER_DOWN, ldap.CONNECT_ERROR):
_logger.exception("LDAP server is down")
raise NoAnswerError(server)
except ldap.INVALID_CREDENTIALS:
_logger.warning("Server %s reported invalid credentials for user %s",
server, login)
return False
except ldap.TIMEOUT as error:
_logger.error("Timed out waiting for LDAP bind operation")
raise TimeoutError(error)
except ldap.LDAPError:
_logger.exception("An LDAP error occurred when authenticating user %s "
"against server %s", login, server)
return False
except UserNotFound:
_logger.exception("Username %s was not found in the LDAP catalog %s",
login, server)
return False
_logger.debug("LDAP authenticated user %s", login)
# If successful so far, verify required group memberships before
# the final verdict is made
group_dn = _config.get('ldap', 'require_group')
if group_dn:
if user.is_group_member(group_dn):
_logger.info("%s is verified to be a member of %s",
login, group_dn)
return user
else:
_logger.warning("Could NOT verify %s as a member of %s",
login, group_dn)
return False
# If no group matching was needed, we are already authenticated,
# so return that.
return user
def main():
"""Program entry function."""
# XXX: Stupid Apache on shrapnel has TZ set to US/Eastern, no idea why!
os.environ['TZ'] = 'Eire'
print("Content-type: text/html")
print()
atexit.register(shutdown)
# Sets up an exception handler for uncaught exceptions and saves
# traceback information locally.
#
cgitb.enable(logdir='%s/tracebacks' % os.getcwd())
global form
form = cgi.FieldStorage()
opt.mode = form.getfirst('mode')
if opt.mode not in cmds:
opt.mode = 'card'
opt.action = form.getfirst('action')
# XXX remove usr.override
# usr.override = opt.override = form.getfirst('override') == '1'
opt.override = form.getfirst('override') == '1'
# Start HTML now only for modes that print output *before* html_form is
# called (which calls start_html itself). We delay the printing of the
# header for all other modes as mode switching may occur (e.g.
# cardid <-> add/renew).
#
if opt.mode in cmds_noform or (opt.mode in cmds_custom and opt.action):
html_start()
global udb
udb = RBUserDB()
udb.setopt(opt)
# Open database and call function for specific command only if action
# is required or the command needs no user input (i.e. no blank form
# stage).
#
if opt.mode in cmds_noform or opt.action:
try:
udb.connect()
except ldap.LDAPError as err:
error(err, 'Could not connect to user database')
# not reached
try:
eval(opt.mode + '()')
except (ldap.LDAPError, RBError) as err:
error(err)
# not reached
html_form()
sys.exit(0)