def _ldap_connection(self):
"""
Context manager for ldap connections
"""
if self.no_verify:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_NEVER)
ldap_cxn = ldap.initialize('{0}'.format(self.uri))
ldap_cxn.protocol_version = 3
ldap_cxn.set_option(ldap.OPT_REFERRALS, 0)
if self.tls and not self.uri.startswith('ldaps'):
ldap_cxn.start_tls_s()
yield ldap_cxn
python类OPT_X_TLS_NEVER的实例源码
def _connect_to_ldap(self):
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
connection = ldap.initialize(self.server_uri)
if self.start_tls:
try:
connection.start_tls_s()
except ldap.LDAPError:
e = get_exception()
self.module.fail_json(msg="Cannot start TLS.", details=str(e))
try:
if self.bind_dn is not None:
connection.simple_bind_s(self.bind_dn, self.bind_pw)
else:
connection.sasl_interactive_bind_s('', ldap.sasl.external())
except ldap.LDAPError:
e = get_exception()
self.module.fail_json(
msg="Cannot bind to the server.", details=str(e))
return connection
def __init__(self, backend, mode=PLAIN,
cert=None,
key=None,
cacertdir='/etc/ssl/certs',
):
self.backend = backend
self._server = None
self._schema = {}
self._cert = cert
self._key = key
logger.debug("LDAP _session created, id: {}".format(id(self)))
# Switch to LDAPS mode if ldaps is backend start with 'ldaps'
if 'ldaps' == backend[:5].lower():
mode = self.LDAPS
# Set CACERTDIR and REQUIRED_CERT to TLS_DEMAND (validation required) if needed
if mode in (self.STARTTLS, self.LDAPS) and cacertdir is not None:
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, cacertdir)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
if cacertdir is None:
warnings.warn("You are in INSECURE mode", ImportWarning, stacklevel=2)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
# Set client certificate if both cert and key are provided
if cert is not None and key is not None:
if not os.path.isfile(cert):
raise LDAPSessionException("Certificate file {} does not exist".format(cert))
if not os.path.isfile(key):
raise LDAPSessionException("Certificate key file {} does not exist".format(cert))
ldap.set_option(ldap.OPT_X_TLS_CERTFILE, cert)
ldap.set_option(ldap.OPT_X_TLS_KEYFILE, key)
self._server = ldap.initialize(self.backend, bytes_mode=False)
# Proceed STARTTLS
if mode == self.STARTTLS:
self._server.start_tls_s()
def ldap_search(self, filter, attributes, incremental, incremental_filter):
"""
Query the configured LDAP server with the provided search filter and
attribute list.
"""
for uri in self.conf_LDAP_SYNC_BIND_URI:
#Read record of this uri
if (self.working_uri == uri):
adldap_sync = self.working_adldap_sync
created = False
else:
adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri)
if ((adldap_sync.syncs_to_full > 0) and incremental):
filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT))
logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use)
else:
filter_to_use = filter
ldap.set_option(ldap.OPT_REFERRALS, 0)
#ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
l = PagedLDAPObject(uri)
l.protocol_version = 3
if (uri.startswith('ldaps:')):
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_DEMAND, True)
else:
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER)
l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
l.set_option(ldap.OPT_X_TLS_DEMAND, False)
try:
l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS)
except ldap.LDAPError as e:
logger.error("Error connecting to LDAP server %s : %s" % (uri, e))
continue
results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None)
l.unbind_s()
if (self.working_uri is None):
self.working_uri = uri
self.conf_LDAP_SYNC_BIND_URI.insert(0, uri)
self.working_adldap_sync = adldap_sync
return (uri, results) # Return both the LDAP server URI used and the request. This is for incremental sync purposes
#if not connected correctly, raise error
raise