def initiate_ldap():
"""
contact the LDAP server to return a LDAP object
"""
ldap_schemes = ['ldap://', 'ldaps://']
ldap.set_option(ldap.OPT_DEBUG_LEVEL, 0)
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, config.get('ldap', 'cacertdir'))
ldap.set_option(ldap.OPT_X_TLS_CERTFILE, config.get('ldap', 'certfile'))
ldap.set_option(ldap.OPT_X_TLS_KEYFILE, config.get('ldap', 'keyfile'))
ldap.set_option(ldap.OPT_X_TLS_DEMAND, True)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) # TRY, NEVER, DEMAND
ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
for scheme in ldap_schemes:
ldap_url = scheme + server_url
ldap_obj = ldap.initialize(ldap_url)
try:
ldap_obj.start_tls_s()
except ldap.OPERATIONS_ERROR as e:
e_msg = e[0]['info']
if e_msg == 'TLS already started':
pass
else:
raise
except ldap.SERVER_DOWN:
if scheme is not ldap_schemes[-1]:
continue
else:
raise
if login_dn != 'DEFAULT': # Use anonymous bind if login_dn is set as DEFAULT
ldap_obj.bind(login_dn, password, ldap.AUTH_SIMPLE)
else:
try:
ldap_obj.whoami_s()
except ldap.UNWILLING_TO_PERFORM:
print 'Anonymous binding is disabled by server'
raise SystemExit
return ldap_obj
break
python类OPT_X_TLS_DEMAND的实例源码
def __init__(self, backend, mode=PLAIN,
cert=None,
key=None,
cacertdir='/etc/ssl/certs',
):
self.backend = backend
self._server = None
self._schema = {}
self._cert = cert
self._key = key
logger.debug("LDAP _session created, id: {}".format(id(self)))
# Switch to LDAPS mode if ldaps is backend start with 'ldaps'
if 'ldaps' == backend[:5].lower():
mode = self.LDAPS
# Set CACERTDIR and REQUIRED_CERT to TLS_DEMAND (validation required) if needed
if mode in (self.STARTTLS, self.LDAPS) and cacertdir is not None:
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, cacertdir)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
if cacertdir is None:
warnings.warn("You are in INSECURE mode", ImportWarning, stacklevel=2)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
# Set client certificate if both cert and key are provided
if cert is not None and key is not None:
if not os.path.isfile(cert):
raise LDAPSessionException("Certificate file {} does not exist".format(cert))
if not os.path.isfile(key):
raise LDAPSessionException("Certificate key file {} does not exist".format(cert))
ldap.set_option(ldap.OPT_X_TLS_CERTFILE, cert)
ldap.set_option(ldap.OPT_X_TLS_KEYFILE, key)
self._server = ldap.initialize(self.backend, bytes_mode=False)
# Proceed STARTTLS
if mode == self.STARTTLS:
self._server.start_tls_s()
def ldap_search(self, filter, attributes, incremental, incremental_filter):
"""
Query the configured LDAP server with the provided search filter and
attribute list.
"""
for uri in self.conf_LDAP_SYNC_BIND_URI:
#Read record of this uri
if (self.working_uri == uri):
adldap_sync = self.working_adldap_sync
created = False
else:
adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri)
if ((adldap_sync.syncs_to_full > 0) and incremental):
filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT))
logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use)
else:
filter_to_use = filter
ldap.set_option(ldap.OPT_REFERRALS, 0)
#ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
l = PagedLDAPObject(uri)
l.protocol_version = 3
if (uri.startswith('ldaps:')):
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_DEMAND, True)
else:
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER)
l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
l.set_option(ldap.OPT_X_TLS_DEMAND, False)
try:
l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS)
except ldap.LDAPError as e:
logger.error("Error connecting to LDAP server %s : %s" % (uri, e))
continue
results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None)
l.unbind_s()
if (self.working_uri is None):
self.working_uri = uri
self.conf_LDAP_SYNC_BIND_URI.insert(0, uri)
self.working_adldap_sync = adldap_sync
return (uri, results) # Return both the LDAP server URI used and the request. This is for incremental sync purposes
#if not connected correctly, raise error
raise