def _ldap_connection(self):
"""
Context manager for ldap connections
"""
if self.no_verify:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_NEVER)
ldap_cxn = ldap.initialize('{0}'.format(self.uri))
ldap_cxn.protocol_version = 3
ldap_cxn.set_option(ldap.OPT_REFERRALS, 0)
if self.tls and not self.uri.startswith('ldaps'):
ldap_cxn.start_tls_s()
yield ldap_cxn
python类set_option()的实例源码
def _connect_to_ldap(self):
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
connection = ldap.initialize(self.server_uri)
if self.start_tls:
try:
connection.start_tls_s()
except ldap.LDAPError:
e = get_exception()
self.module.fail_json(msg="Cannot start TLS.", details=str(e))
try:
if self.bind_dn is not None:
connection.simple_bind_s(self.bind_dn, self.bind_pw)
else:
connection.sasl_interactive_bind_s('', ldap.sasl.external())
except ldap.LDAPError:
e = get_exception()
self.module.fail_json(
msg="Cannot bind to the server.", details=str(e))
return connection
def initiate_ldap():
"""
contact the LDAP server to return a LDAP object
"""
ldap_schemes = ['ldap://', 'ldaps://']
ldap.set_option(ldap.OPT_DEBUG_LEVEL, 0)
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, config.get('ldap', 'cacertdir'))
ldap.set_option(ldap.OPT_X_TLS_CERTFILE, config.get('ldap', 'certfile'))
ldap.set_option(ldap.OPT_X_TLS_KEYFILE, config.get('ldap', 'keyfile'))
ldap.set_option(ldap.OPT_X_TLS_DEMAND, True)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) # TRY, NEVER, DEMAND
ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
for scheme in ldap_schemes:
ldap_url = scheme + server_url
ldap_obj = ldap.initialize(ldap_url)
try:
ldap_obj.start_tls_s()
except ldap.OPERATIONS_ERROR as e:
e_msg = e[0]['info']
if e_msg == 'TLS already started':
pass
else:
raise
except ldap.SERVER_DOWN:
if scheme is not ldap_schemes[-1]:
continue
else:
raise
if login_dn != 'DEFAULT': # Use anonymous bind if login_dn is set as DEFAULT
ldap_obj.bind(login_dn, password, ldap.AUTH_SIMPLE)
else:
try:
ldap_obj.whoami_s()
except ldap.UNWILLING_TO_PERFORM:
print 'Anonymous binding is disabled by server'
raise SystemExit
return ldap_obj
break
def ldap_auth(self, username, password):
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self.cert_path)
connection = ldap.initialize(self.ldap_url)
connection.set_option(ldap.OPT_REFERRALS, 0)
try:
if password:
connection.simple_bind_s(username + self.user_suffix, password)
else:
return False
except ldap.INVALID_CREDENTIALS:
return False
except ldap.SERVER_DOWN:
return None
return True
def __init__(self, backend, mode=PLAIN,
cert=None,
key=None,
cacertdir='/etc/ssl/certs',
):
self.backend = backend
self._server = None
self._schema = {}
self._cert = cert
self._key = key
logger.debug("LDAP _session created, id: {}".format(id(self)))
# Switch to LDAPS mode if ldaps is backend start with 'ldaps'
if 'ldaps' == backend[:5].lower():
mode = self.LDAPS
# Set CACERTDIR and REQUIRED_CERT to TLS_DEMAND (validation required) if needed
if mode in (self.STARTTLS, self.LDAPS) and cacertdir is not None:
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, cacertdir)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
if cacertdir is None:
warnings.warn("You are in INSECURE mode", ImportWarning, stacklevel=2)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
# Set client certificate if both cert and key are provided
if cert is not None and key is not None:
if not os.path.isfile(cert):
raise LDAPSessionException("Certificate file {} does not exist".format(cert))
if not os.path.isfile(key):
raise LDAPSessionException("Certificate key file {} does not exist".format(cert))
ldap.set_option(ldap.OPT_X_TLS_CERTFILE, cert)
ldap.set_option(ldap.OPT_X_TLS_KEYFILE, key)
self._server = ldap.initialize(self.backend, bytes_mode=False)
# Proceed STARTTLS
if mode == self.STARTTLS:
self._server.start_tls_s()
def connect(self,
uri=rbconfig.LDAP_URI,
dn=rbconfig.LDAP_ROOT_DN,
password=None,
dcu_uri=rbconfig.LDAP_DCU_URI,
dcu_dn=rbconfig.LDAP_DCU_RBDN,
dcu_pw=None):
"""Connect to databases.
Custom URI, DN and password may be given for RedBrick LDAP.
Password if not given will be read from shared secret file set
in rbconfig.
Custom URI may be given for DCU LDAP. """
if not password:
try:
pw_file = open(rbconfig.LDAP_ROOTPW_FILE, 'r')
password = pw_file.readline().rstrip()
except IOError:
raise RBFatalError("Unable to open LDAP root password file")
pw_file.close()
if not dcu_pw:
try:
pw_file = open(rbconfig.LDAP_DCU_RBPW, 'r')
dcu_pw = pw_file.readline().rstrip()
except IOError:
raise RBFatalError("Unable to open DCU AD root password file")
pw_file.close()
# Default protocol seems to be 2, set to 3.
ldap.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
# Connect to RedBrick LDAP.
self.ldap = ldap.initialize(uri)
self.ldap.simple_bind_s(dn, password)
# Connect to DCU LDAP (anonymous bind).
self.ldap_dcu = ldap.initialize(dcu_uri)
# self.ldap_dcu.simple_bind_s('', '')
self.ldap_dcu.simple_bind_s(dcu_dn, dcu_pw)
def ldap_search(self, filter, attributes, incremental, incremental_filter):
"""
Query the configured LDAP server with the provided search filter and
attribute list.
"""
for uri in self.conf_LDAP_SYNC_BIND_URI:
#Read record of this uri
if (self.working_uri == uri):
adldap_sync = self.working_adldap_sync
created = False
else:
adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri)
if ((adldap_sync.syncs_to_full > 0) and incremental):
filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT))
logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use)
else:
filter_to_use = filter
ldap.set_option(ldap.OPT_REFERRALS, 0)
#ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
l = PagedLDAPObject(uri)
l.protocol_version = 3
if (uri.startswith('ldaps:')):
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_DEMAND, True)
else:
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER)
l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
l.set_option(ldap.OPT_X_TLS_DEMAND, False)
try:
l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS)
except ldap.LDAPError as e:
logger.error("Error connecting to LDAP server %s : %s" % (uri, e))
continue
results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None)
l.unbind_s()
if (self.working_uri is None):
self.working_uri = uri
self.conf_LDAP_SYNC_BIND_URI.insert(0, uri)
self.working_adldap_sync = adldap_sync
return (uri, results) # Return both the LDAP server URI used and the request. This is for incremental sync purposes
#if not connected correctly, raise error
raise
def open_ldap():
"""
Returns a freshly made LDAP object, according to the settings
configured in webfront.conf.
"""
# Get config settings
server = _config.get('ldap', 'server')
port = _config.getint('ldap', 'port')
encryption = _config.get('ldap', 'encryption').lower()
timeout = _config.getfloat('ldap', 'timeout')
# Revert to no encryption if none of the valid settings are found
if encryption not in ('ssl', 'tls', 'none'):
_logger.warning('Unknown encryption setting %r in config file, '
'using no encryption instead',
_config.get('ldap', 'encryption'))
encryption = 'none'
# Debug tracing from python-ldap/openldap to stderr
if _config.getboolean('ldap', 'debug'):
ldap.set_option(ldap.OPT_DEBUG_LEVEL, 255)
# Use STARTTLS if enabled, then fail miserably if the server
# does not support it
if encryption == 'tls':
_logger.debug("Using STARTTLS for ldap connection")
lconn = ldap.open(server, port)
lconn.timeout = timeout
try:
lconn.start_tls_s()
except ldap.PROTOCOL_ERROR:
_logger.error('LDAP server %s does not support the STARTTLS '
'extension. Aborting.', server)
raise NoStartTlsError(server)
except (ldap.SERVER_DOWN, ldap.CONNECT_ERROR):
_logger.exception("LDAP server is down")
raise NoAnswerError(server)
else:
scheme = encryption == 'ssl' and 'ldaps' or 'ldap'
uri = '%s://%s:%s' % (scheme, server, port)
lconn = ldap.initialize(uri)
lconn.timeout = timeout
return lconn