python类set_option()的实例源码

cpldap.py 文件源码 项目:auth-tool 作者: luciddg 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _ldap_connection(self):
        """
        Context manager for ldap connections
        """
        if self.no_verify:
            ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
                            ldap.OPT_X_TLS_NEVER)

        ldap_cxn = ldap.initialize('{0}'.format(self.uri))
        ldap_cxn.protocol_version = 3
        ldap_cxn.set_option(ldap.OPT_REFERRALS, 0)

        if self.tls and not self.uri.startswith('ldaps'):
            ldap_cxn.start_tls_s()

        yield ldap_cxn
ldap_attr.py 文件源码 项目:isam-ansible-roles 作者: IBM-Security 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _connect_to_ldap(self):
        ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
        connection = ldap.initialize(self.server_uri)

        if self.start_tls:
            try:
                connection.start_tls_s()
            except ldap.LDAPError:
                e = get_exception()
                self.module.fail_json(msg="Cannot start TLS.", details=str(e))

        try:
            if self.bind_dn is not None:
                connection.simple_bind_s(self.bind_dn, self.bind_pw)
            else:
                connection.sasl_interactive_bind_s('', ldap.sasl.external())
        except ldap.LDAPError:
            e = get_exception()
            self.module.fail_json(
                msg="Cannot bind to the server.", details=str(e))

        return connection
sync_ldap_accounts.py 文件源码 项目:rucio 作者: rucio01 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def initiate_ldap():
    """
    contact the LDAP server to return a LDAP object
    """
    ldap_schemes = ['ldap://', 'ldaps://']
    ldap.set_option(ldap.OPT_DEBUG_LEVEL, 0)
    ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, config.get('ldap', 'cacertdir'))
    ldap.set_option(ldap.OPT_X_TLS_CERTFILE, config.get('ldap', 'certfile'))
    ldap.set_option(ldap.OPT_X_TLS_KEYFILE, config.get('ldap', 'keyfile'))
    ldap.set_option(ldap.OPT_X_TLS_DEMAND, True)
    ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)  # TRY, NEVER, DEMAND
    ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
    for scheme in ldap_schemes:
        ldap_url = scheme + server_url
        ldap_obj = ldap.initialize(ldap_url)
        try:
            ldap_obj.start_tls_s()
        except ldap.OPERATIONS_ERROR as e:
            e_msg = e[0]['info']
            if e_msg == 'TLS already started':
                pass
            else:
                raise
        except ldap.SERVER_DOWN:
            if scheme is not ldap_schemes[-1]:
                continue
            else:
                raise
        if login_dn != 'DEFAULT':  # Use anonymous bind if login_dn is set as DEFAULT
            ldap_obj.bind(login_dn, password, ldap.AUTH_SIMPLE)
        else:
            try:
                ldap_obj.whoami_s()
            except ldap.UNWILLING_TO_PERFORM:
                print 'Anonymous binding is disabled by server'
                raise SystemExit
        return ldap_obj
        break
ldap.py 文件源码 项目:iris 作者: linkedin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ldap_auth(self, username, password):
        ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self.cert_path)
        connection = ldap.initialize(self.ldap_url)
        connection.set_option(ldap.OPT_REFERRALS, 0)

        try:
            if password:
                connection.simple_bind_s(username + self.user_suffix, password)
            else:
                return False
        except ldap.INVALID_CREDENTIALS:
            return False
        except ldap.SERVER_DOWN:
            return None
        return True
session.py 文件源码 项目:pyldap_orm 作者: asyd 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, backend, mode=PLAIN,
                 cert=None,
                 key=None,
                 cacertdir='/etc/ssl/certs',
                 ):

        self.backend = backend
        self._server = None
        self._schema = {}
        self._cert = cert
        self._key = key

        logger.debug("LDAP _session created, id: {}".format(id(self)))

        # Switch to LDAPS mode if ldaps is backend start with 'ldaps'
        if 'ldaps' == backend[:5].lower():
            mode = self.LDAPS

        # Set CACERTDIR and REQUIRED_CERT to TLS_DEMAND (validation required) if needed
        if mode in (self.STARTTLS, self.LDAPS) and cacertdir is not None:
            ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, cacertdir)
            ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)

        if cacertdir is None:
            warnings.warn("You are in INSECURE mode", ImportWarning, stacklevel=2)
            ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)

        # Set client certificate if both cert and key are provided
        if cert is not None and key is not None:
            if not os.path.isfile(cert):
                raise LDAPSessionException("Certificate file {} does not exist".format(cert))
            if not os.path.isfile(key):
                raise LDAPSessionException("Certificate key file {} does not exist".format(cert))
            ldap.set_option(ldap.OPT_X_TLS_CERTFILE, cert)
            ldap.set_option(ldap.OPT_X_TLS_KEYFILE, key)

        self._server = ldap.initialize(self.backend, bytes_mode=False)

        # Proceed STARTTLS
        if mode == self.STARTTLS:
            self._server.start_tls_s()
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def connect(self,
                uri=rbconfig.LDAP_URI,
                dn=rbconfig.LDAP_ROOT_DN,
                password=None,
                dcu_uri=rbconfig.LDAP_DCU_URI,
                dcu_dn=rbconfig.LDAP_DCU_RBDN,
                dcu_pw=None):
        """Connect to databases.
        Custom URI, DN and password may be given for RedBrick LDAP.
        Password if not given will be read from shared secret file set
        in rbconfig.
        Custom URI may be given for DCU LDAP. """
        if not password:
            try:
                pw_file = open(rbconfig.LDAP_ROOTPW_FILE, 'r')
                password = pw_file.readline().rstrip()
            except IOError:
                raise RBFatalError("Unable to open LDAP root password file")
            pw_file.close()

        if not dcu_pw:
            try:
                pw_file = open(rbconfig.LDAP_DCU_RBPW, 'r')
                dcu_pw = pw_file.readline().rstrip()
            except IOError:
                raise RBFatalError("Unable to open DCU AD root password file")
            pw_file.close()

        # Default protocol seems to be 2, set to 3.
        ldap.set_option(ldap.OPT_PROTOCOL_VERSION, 3)

        # Connect to RedBrick LDAP.
        self.ldap = ldap.initialize(uri)
        self.ldap.simple_bind_s(dn, password)

        # Connect to DCU LDAP (anonymous bind).
        self.ldap_dcu = ldap.initialize(dcu_uri)
        #       self.ldap_dcu.simple_bind_s('', '')
        self.ldap_dcu.simple_bind_s(dcu_dn, dcu_pw)
syncldap.py 文件源码 项目:django-adldap-sync 作者: marchete 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def ldap_search(self, filter, attributes, incremental, incremental_filter):
        """
        Query the configured LDAP server with the provided search filter and
        attribute list.
        """
        for uri in self.conf_LDAP_SYNC_BIND_URI:
            #Read record of this uri
            if (self.working_uri == uri):
                adldap_sync = self.working_adldap_sync
                created = False
            else:
                adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri)

            if ((adldap_sync.syncs_to_full > 0) and incremental):
                filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT))
                logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use)
            else:
                filter_to_use = filter

            ldap.set_option(ldap.OPT_REFERRALS, 0)
            #ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
            l = PagedLDAPObject(uri)
            l.protocol_version = 3

            if (uri.startswith('ldaps:')):
                l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
                l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
                l.set_option(ldap.OPT_X_TLS_DEMAND, True)
            else:
                l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER)
                l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
                l.set_option(ldap.OPT_X_TLS_DEMAND, False)
            try:
                l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS)
            except ldap.LDAPError as e:
                logger.error("Error connecting to LDAP server %s : %s" % (uri, e))
                continue

            results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None)
            l.unbind_s()
            if (self.working_uri is None):
                self.working_uri = uri
                self.conf_LDAP_SYNC_BIND_URI.insert(0, uri)
                self.working_adldap_sync = adldap_sync

            return (uri, results)  # Return both the LDAP server URI used and the request. This is for incremental sync purposes
        #if not connected correctly, raise error
        raise
ldapauth.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def open_ldap():
    """
    Returns a freshly made LDAP object, according to the settings
    configured in webfront.conf.
    """
    # Get config settings
    server = _config.get('ldap', 'server')
    port = _config.getint('ldap', 'port')
    encryption = _config.get('ldap', 'encryption').lower()
    timeout = _config.getfloat('ldap', 'timeout')
    # Revert to no encryption if none of the valid settings are found
    if encryption not in ('ssl', 'tls', 'none'):
        _logger.warning('Unknown encryption setting %r in config file, '
                        'using no encryption instead',
                        _config.get('ldap', 'encryption'))
        encryption = 'none'

    # Debug tracing from python-ldap/openldap to stderr
    if _config.getboolean('ldap', 'debug'):
        ldap.set_option(ldap.OPT_DEBUG_LEVEL, 255)

    # Use STARTTLS if enabled, then fail miserably if the server
    # does not support it
    if encryption == 'tls':
        _logger.debug("Using STARTTLS for ldap connection")
        lconn = ldap.open(server, port)
        lconn.timeout = timeout
        try:
            lconn.start_tls_s()
        except ldap.PROTOCOL_ERROR:
            _logger.error('LDAP server %s does not support the STARTTLS '
                          'extension.  Aborting.', server)
            raise NoStartTlsError(server)
        except (ldap.SERVER_DOWN, ldap.CONNECT_ERROR):
            _logger.exception("LDAP server is down")
            raise NoAnswerError(server)
    else:
        scheme = encryption == 'ssl' and 'ldaps' or 'ldap'
        uri = '%s://%s:%s' % (scheme, server, port)
        lconn = ldap.initialize(uri)
        lconn.timeout = timeout

    return lconn


问题


面经


文章

微信
公众号

扫码关注公众号