python类SCOPE_SUBTREE的实例源码

ldapinfo.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __call__(self, userid, base="", filter_pattern="", scope=SCOPE_SUBTREE,
                 tls=False, attr=None, attrsonly=False, **kwargs):

        if filter_pattern:
            _filter = filter_pattern % userid
        else:
            _filter = self.filter_pattern % userid

        _base = base or self.base
        _scope = scope or self.scope
        _attr = attr or self.attr
        _attrsonly = attrsonly or self.attrsonly
        arg = [_base, _scope, _filter, _attr, _attrsonly]
        res = self.ld.search_s(*arg)
        # should only be one entry and the information per entry is
        # the tuple (dn, ava)
        return res[0][1]
lcmldap.py 文件源码 项目:umanager 作者: lcm-unimi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getusercredbyuid(self, username):
        baseDN         = "ou=People,"+self.dc
        searchScope    = ldap.SCOPE_SUBTREE
        searchFilter   = "uid="+username
        searchAttrList = ["givenName","sn"]

        try:
            # Result is a list of touple (one for each match) of dn and attrs.
            # Attrs is a dict, each value is a list of string.
            result = self.conn.search_s(baseDN, searchScope, searchFilter, searchAttrList)
            if ( result==[] ):
                print("User %s does not exist!", username)
                return 
            result = [value[0] for value in result[0][1].values()]
            # Returns 2-element list, name and surname
            return result
        except ldap.LDAPError as e:
            print(e)
            return
authentication.py 文件源码 项目:kinto-ldap 作者: Kinto 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def ldap_ping(request):
    """Verify if the LDAP server is ready."""
    settings = request.registry.settings
    bind_dn = settings.get('ldap.bind_dn')
    bind_password = settings.get('ldap.bind_password')
    base_dn = settings['ldap.base_dn']
    cm = request.registry.ldap_cm
    try:
        with cm.connection(bind_dn, bind_password) as conn:
            # Perform a dumb query
            filters = settings['ldap.filters'].format(mail="demo")
            conn.search_s(base_dn, SCOPE_SUBTREE, filters)
            ldap = True
    except Exception:
        logger.exception("Heartbeat Failure")
        ldap = False

    return ldap
ldap.py 文件源码 项目:matrix-bot 作者: psaavedra 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_custom_ldap_group_members(ldap_settings, group_name):
    logger = utils.get_logger()
    ldap_server = ldap_settings["server"]
    ldap_base = ldap_settings["base"]
    get_uid = lambda x: x[1]["uid"][0]
    members = []
    try:
        conn = LDAP.initialize(ldap_server)
        g_ldap_filter = ldap_settings[group_name]
        logger.debug("Searching members for %s: %s" % (group_name,
                                                       g_ldap_filter))
        items = conn.search_s(ldap_base, LDAP.SCOPE_SUBTREE,
                              attrlist=['uid'],
                              filterstr=g_ldap_filter)
        members = map(get_uid, items)
    except Exception, e:
        logger.error("Error getting custom group %s from LDAP: %s" % (group_name, e))
    return members
ldap.py 文件源码 项目:matrix-bot 作者: psaavedra 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_ldap_group_members(ldap_settings, group_name):
    # base:dc=example,dc=com
    # filter:(&(objectClass=posixGroup)(cn={group_name}))
    logger = utils.get_logger()
    ldap_server = ldap_settings["server"]
    ldap_base = ldap_settings["groups_base"]
    ldap_filter = "(&%s(%s={group_name}))" % (ldap_settings["groups_filter"], ldap_settings["groups_id"])
    get_uid = lambda x: x.split(",")[0].split("=")[1]
    try:
        ad_filter = ldap_filter.replace('{group_name}', group_name)
        conn = LDAP.initialize(ldap_server)
        logger.debug("Searching members for %s: %s - %s - %s" % (group_name,
                                                                 ldap_server,
                                                                 ldap_base,
                                                                 ad_filter))
        res = conn.search_s(ldap_base, LDAP.SCOPE_SUBTREE, ad_filter)
    except Exception, e:
        logger.error("Error getting group from LDAP: %s" % e)

    return map(get_uid, res[0][1]['uniqueMember'])
ldap.py 文件源码 项目:matrix-bot 作者: psaavedra 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_ldap_groups(ldap_settings):
    '''Returns the a list of found LDAP groups filtered with the groups list in
the settings
    '''
    # filter:(objectClass=posixGroup)
    # base:ou=Group,dc=example,dc=com
    logger = utils.get_logger()
    ldap_server = ldap_settings["server"]
    ldap_base = ldap_settings["groups_base"]
    ldap_filter = ldap_settings["groups_filter"]
    ldap_groups = ldap_settings["groups"]
    get_uid = lambda x: x[1]["cn"][0]
    try:
        conn = LDAP.initialize(ldap_server)
        logger.debug("Searching groups: %s - %s - %s" % (ldap_server,
                                                         ldap_base,
                                                         ldap_filter))
        res = conn.search_s(ldap_base, LDAP.SCOPE_SUBTREE, ldap_filter)
        return filter((lambda x: x in ldap_groups), map(get_uid, res))
    except Exception, e:
        logger.error("Error getting groups from LDAP: %s" % e)
authentication.py 文件源码 项目:isard 作者: isard-vdi 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def authentication_ldap(self,username,password,returnObject=True):
        cfg=r.table('config').get(1).run(db.conn)['auth']
        try:
            conn = ldap.initialize(cfg['ldap']['ldap_server'])
            id_conn = conn.search(cfg['ldap']['bind_dn'],ldap.SCOPE_SUBTREE,"uid=%s" % username)
            tmp,info=conn.result(id_conn, 0)
            user_dn=info[0][0]
            if conn.simple_bind_s(who=user_dn,cred=password):
                '''
                config/ldapauth.py has the function you can change to adapt to your ldap
                '''
                au=myLdapAuth()
                newUser=au.newUser(username,info[0])
                return User(newUser) if returnObject else newUser
            else:
                return False
        except Exception as e:
            log.error("LDAP ERROR:",e)
            return False
ldaptools.py 文件源码 项目:pizza-auth 作者: xxpizzaxx 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def updateuser(self, uid, modattrs):
        l = ldap.initialize(self.config["server"])
        l.simple_bind(self.config["admin"], self.config["password"])
        dn = "uid=%s,%s" % (uid, self.config["memberdn"])
        ldap_filter = "uid="+uid
        result_id = l.search(self.config["memberdn"], ldap.SCOPE_SUBTREE, ldap_filter, None)
        if result_id:
            type, data = l.result(result_id, 0)
        if data:
            dn, attrs = data[0]
            oldattrs = attrs
            newattrs = attrs.copy()
            newattrs.update(modattrs)
            # now change it
            newattrs.update(oldattrs)
            ldif = modlist.modifyModlist(oldattrs, newattrs)
            print ldif
            l.modify_s(dn, ldif)
            l.unbind_s()
            return True
        else:
            return False
ldapobject.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def find_unique_entry(self,base,scope=ldap.SCOPE_SUBTREE,filterstr='(objectClass=*)',attrlist=None,attrsonly=0,serverctrls=None,clientctrls=None,timeout=-1):
    """
    Returns a unique entry, raises exception if not unique
    """
    r = self.search_ext_s(
      base,
      scope,
      filterstr,
      attrlist=attrlist or ['*'],
      attrsonly=attrsonly,
      serverctrls=serverctrls,
      clientctrls=clientctrls,
      timeout=timeout,
      sizelimit=2,
    )
    if len(r)!=1:
      raise NO_UNIQUE_ENTRY('No or non-unique search result for %s' % (repr(filterstr)))
    return r[0]
session.py 文件源码 项目:pyldap_orm 作者: asyd 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def search(self, base, scope=ldap.SCOPE_SUBTREE, ldap_filter='(objectClass=*)', attributes=None,
               serverctrls=None):
        """
        Perform a low level LDAP search (synchronous) using the given arguments.

        :param base: Base DN of the search
        :param scope: Scope of the search, default is SCOPE_SUBTREE
        :param ldap_filter: ldap filter, default is '(objectClass=*)'
        :param attributes: An array of attributes to return, default is ['*']
        :param serverctrls: An array server extended controls
        :return: a list of tuples (dn, attributes)
        """
        if serverctrls is None:
            logger.debug("Performing LDAP search: base: {}, scope: {}, filter: {}".format(base, scope, ldap_filter))
            return self._server.search_s(base, scope, ldap_filter, attributes)
        else:
            logger.debug("Performing ext LDAP search: base: {}, scope: {}, filter: {}, serverctrls={}".
                         format(base,
                                scope,
                                ldap_filter,
                                serverctrls))
            return self._server.search_ext_s(base, scope, ldap_filter, attrlist=attributes,
                                             serverctrls=serverctrls)
ldap.py 文件源码 项目:adminset 作者: guohongze 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def ldap_search_dn(self,uid=None):
        obj = self.ldapconn
        obj.protocal_version = ldap.VERSION3
        searchScope = ldap.SCOPE_SUBTREE
        retrieveAttributes = None
        searchFilter = "cn=" + uid

        try:
            ldap_result_id = obj.search(self.base_dn, searchScope, searchFilter, retrieveAttributes)
            result_type, result_data = obj.result(ldap_result_id, 0)
#??????
#('cn=django,ou=users,dc=gccmx,dc=cn',
#    {  'objectClass': ['inetOrgPerson', 'top'],
#        'userPassword': ['{MD5}lueSGJZetyySpUndWjMBEg=='],
#        'cn': ['django'], 'sn': ['django']  }  )
#
            if result_type == ldap.RES_SEARCH_ENTRY:
                #dn = result[0][0]
                return result_data[0][0]
            else:
                return None
        except ldap.LDAPError, e:
            print e

#??????????????
ldap.py 文件源码 项目:adminset 作者: guohongze 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def ldap_get_user(self,uid=None):
        obj = self.ldapconn
        obj.protocal_version = ldap.VERSION3
        searchScope = ldap.SCOPE_SUBTREE
        retrieveAttributes = None
        searchFilter = "cn=" + uid
        try:
            ldap_result_id = obj.search(self.base_dn, searchScope, searchFilter, retrieveAttributes)
            result_type, result_data = obj.result(ldap_result_id, 0)
            if result_type == ldap.RES_SEARCH_ENTRY:
                username = result_data[0][1]['cn'][0]
                email = result_data[0][1]['mail'][0]
                nick = result_data[0][1]['sn'][0]
                result = {'username':username,'email':email,'nick':nick}
                return result
            else:
                return None
        except ldap.LDAPError, e:
            print e

#????????????????????LDAP???boolean?
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_student_byid(self, usr, override=0):
        """Populate RBUser object with data from user with given id in
        student database.
        By default will only populate RBUser attributes that have no
        value (None) unless override is enabled.
        Note that all students *should* be in the database, but only
        raise a RBWarningError if user does not exist."""
        res = self.ldap_dcu.search_s(rbconfig.ldap_dcu_students_tree,
                                     ldap.SCOPE_SUBTREE,
                                     'employeeNumber=%s' % usr.id)
        if res:
            self.set_user_dcu(usr, res[0], override)
            self.set_user_dcu_student(usr, res[0], override)
        else:
            raise RBWarningError(
                "Student id '%s' does not exist in database" % usr.id)
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_alumni_byid(self, usr, override=0):
        """Populate RBUser object with data from user with given id in
        alumni database.
        By default will only populate RBUser attributes that have no
        value (None) unless override is enabled.
        Not all alumni will be in the database, so only raise a
        RBWarningError if user does not exist."""

        res = self.ldap_dcu.search_s(rbconfig.ldap_dcu_alumni_tree,
                                     ldap.SCOPE_SUBTREE, 'cn=%s' % usr.id)
        if res:
            self.set_user_dcu(usr, res[0], override)
            self.set_user_dcu_alumni(usr, res[0], override)
        else:
            raise RBWarningError(
                "Alumni id '%s' does not exist in database" % usr.id)
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_staff_byid(self, usr, override=0):
        """Populate RBUser object with data from user with given id in
        staff database.
        By default will only populate RBUser attributes that have no
        value (None) unless override is enabled.
        Not all staff are in the database, so only raise a
        RBWarningError if user does not exist."""

        # Staff ID is not consistently set. It will either be in the cn
        # or in the gecos, so try both.
        #
        res = self.ldap_dcu.search_s(
            rbconfig.ldap_dcu_staff_tree, ldap.SCOPE_SUBTREE,
            '(|(cn=%s)(gecos=*,*%s))' % (usr.id, usr.id))
        if res:
            self.set_user_dcu(usr, res[0], override)
            self.set_user_dcu_staff(usr, res[0], override)
        else:
            raise RBWarningError(
                "Staff id '%s' does not exist in database" % usr.id)
test_ldap.py 文件源码 项目:ldap2pg 作者: dalibo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_logger(mocker):
    from ldap2pg.ldap import LDAPLogger
    from ldap import SCOPE_SUBTREE

    ldap = LDAPLogger(mocker.Mock(name='toto', pouet='toto'))

    assert 'toto' == ldap.pouet

    ldap.search_s('base', scope=SCOPE_SUBTREE, filter='', attributes='cn')
LDAP2Memberships.py 文件源码 项目:mailman-ldap-memberadaptor 作者: rettenbs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __ldap_load_members2(self, l, moderator=False):
        attr = ['mail']
        if not moderator and self.ldapfullname:
            attr.append(self.ldapfullname)
        if self.ldapgroupattr or moderator:
            # group attribute or moderator flag has been set. Let's get the uids.
            if moderator:
                assert self.ldapmodgroupdn
                groupdn = self.ldapmodgroupdn
            else:
                groupdn = self.ldapgroupdn
            members = l.search_s(groupdn, ldap.SCOPE_SUBTREE,
                self.ldapsearch, [self.ldapgroupattr])
            for (dn,attrs) in members:
                if self.ldapgroupattr in attrs:
                    memberids = attrs[self.ldapgroupattr]
                    if DEBUG:
                        syslog('debug','regular groupdns = %s' % groupdns)
                    for memberid in memberids:
                        try:
                            res2 = l.search_s(self.ldapbasedn,
                                              ldap.SCOPE_SUBTREE,
                                              '(&(objectClass=*)('+self.ldapmemberuid+'='+memberid+'))',
                                              attr)
                            self.__loadmembers(res2, moderator)
                        except ldap.NO_SUCH_OBJECT:
                            syslog('warn',"can't process %s: no such object (accountDisabled?)" % memberid)

        else:
            members = l.search_s(self.ldapbasedn,
                                ldap.SCOPE_SUBTREE,
                                self.ldapsearch,
                                attr)
            self.__loadmembers(members)
syncldap.py 文件源码 项目:django-adldap-sync 作者: marchete 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def paged_search_ext_s(self, base, scope, filterstr='(objectClass=*)', attrlist=None, attrsonly=0,
                           serverctrls=None, clientctrls=None, timeout=-1, sizelimit=0):
        """
        Behaves exactly like LDAPObject.search_ext_s() but internally uses the
        simple paged results control to retrieve search results in chunks.
        """
        req_ctrl = SimplePagedResultsControl(True, size=self.conf_LDAP_SYNC_BIND_PAGESIZE, cookie='')

        # Send first search request
        msgid = self.search_ext(base, ldap.SCOPE_SUBTREE, filterstr, attrlist=attrlist,
                                serverctrls=(serverctrls or []) + [req_ctrl])
        results = []

        while True:
            rtype, rdata, rmsgid, rctrls = self.result3(msgid)
            results.extend(rdata)
            # Extract the simple paged results response control
            pctrls = [c for c in rctrls if c.controlType == SimplePagedResultsControl.controlType]

            if pctrls:
                if pctrls[0].cookie:
                    # Copy cookie from response control to request control
                    req_ctrl.cookie = pctrls[0].cookie
                    msgid = self.search_ext(base, ldap.SCOPE_SUBTREE, filterstr, attrlist=attrlist,
                                            serverctrls=(serverctrls or []) + [req_ctrl])
                else:
                    break

        return results
lcmldap.py 文件源码 项目:umanager 作者: lcm-unimi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def userexistsbyuid(self, username):
        baseDN       = "ou=People,"+self.dc
        searchScope  = ldap.SCOPE_SUBTREE
        searchFilter = "uid="+username

        try:
            result = self.conn.search_s(baseDN, searchScope, searchFilter, None)
            if ( result==[] ):
                return False
            else:
                return True
        except ldap.LDAPError as e:
            print(e)
            return False
ldaptools.py 文件源码 项目:pizza-auth 作者: xxpizzaxx 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getuser(self, id):
        if not isinstance(id, basestring):
            id = id[0]
        l = ldap.initialize(self.config["server"])
        l.simple_bind(self.config["admin"], self.config["password"])
        ldap_filter = "uid="+id
        result_id = l.search(self.config["memberdn"], ldap.SCOPE_SUBTREE, ldap_filter, None)
        if result_id:
            type, data = l.result(result_id, 0)
        if data:
            dn, attrs = data[0]
            l.unbind_s()
            return self.User(attrs, self.authconfig["auth"]["domain"])
        l.unbind_s()
        return None
ldaptools.py 文件源码 项目:pizza-auth 作者: xxpizzaxx 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def getusers(self, searchfilter):
        l = ldap.initialize(self.config["server"])
        l.simple_bind(self.config["admin"], self.config["password"])
        ldap_filter = searchfilter
        result_id = l.search(self.config["memberdn"], ldap.SCOPE_SUBTREE, ldap_filter, None)
        results = []
        while 1:
            result_type, result_data = l.result(result_id, 0)
            if (result_data == []):
                break
            else:
                if result_type == ldap.RES_SEARCH_ENTRY:
                    results.append(result_data[0][1])
        return map(lambda x:self.User(x, self.authconfig["auth"]["domain"]), results)
ldap.py 文件源码 项目:gallery 作者: liam-middlebrook 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def ldap_get_members():
    con = ldap.get_con()

    res = con.search_s(
            "ou=Users,dc=csh,dc=rit,dc=edu",
            pyldap.SCOPE_SUBTREE,
            "(memberof=cn=member,ou=groups,dc=csh,dc=rit,dc=edu)",
            ["entryUUID", "displayName"])

    members = [{
        "name": m[1]['displayName'][0].decode('utf-8'),
        "uuid": m[1]['entryUUID'][0].decode('utf-8')
        } for m in res]

    return members
sync_targets.py 文件源码 项目:iris 作者: linkedin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_ldap_lists(l, search_strings, parent_list=None):
    results = set()
    known_ldap_resp_ctrls = {
        SimplePagedResultsControl.controlType: SimplePagedResultsControl,
    }
    req_ctrl = SimplePagedResultsControl(True, size=ldap_pagination_size, cookie='')

    if parent_list:
        filterstr = search_strings['get_all_sub_lists_filter'] % escape_filter_chars(parent_list)
    else:
        filterstr = search_strings['get_all_lists_filter']

    while True:
        msgid = l.search_ext(search_strings['list_search_string'],
                             ldap.SCOPE_SUBTREE,
                             serverctrls=[req_ctrl],
                             attrlist=(search_strings['list_cn_field'], search_strings['list_name_field']),
                             filterstr=filterstr)
        rtype, rdata, rmsgid, serverctrls = l.result3(msgid, resp_ctrl_classes=known_ldap_resp_ctrls)

        results |= {(data[search_strings['list_cn_field']][0], data[search_strings['list_name_field']][0]) for (dn, data) in rdata}

        pctrls = [c for c in serverctrls
                  if c.controlType == SimplePagedResultsControl.controlType]

        cookie = pctrls[0].cookie
        if not cookie:
            break
        req_ctrl.cookie = cookie

    return results
sync_targets.py 文件源码 项目:iris 作者: linkedin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_ldap_list_membership(l, search_strings, list_name):
    results = set()
    known_ldap_resp_ctrls = {
        SimplePagedResultsControl.controlType: SimplePagedResultsControl,
    }
    req_ctrl = SimplePagedResultsControl(True, size=ldap_pagination_size, cookie='')
    while True:
        msgid = l.search_ext(search_strings['user_search_string'],
                             ldap.SCOPE_SUBTREE,
                             serverctrls=[req_ctrl],
                             attrlist=[search_strings['user_account_name_field']],
                             filterstr=search_strings['user_membership_filter'] % escape_filter_chars(list_name)
                             )
        rtype, rdata, rmsgid, serverctrls = l.result3(msgid, resp_ctrl_classes=known_ldap_resp_ctrls)

        results |= {data[1][search_strings['user_account_name_field']][0] for data in rdata}

        pctrls = [c for c in serverctrls
                  if c.controlType == SimplePagedResultsControl.controlType]

        cookie = pctrls[0].cookie
        if not cookie:
            break
        req_ctrl.cookie = cookie

    return results
ldapauth.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def search_dn(self):
        """Searches for the user's Distinguished Name in the LDAP directory.

        :returns: A tuple of (dn, canonical_username)
        """
        uid_attr = escape_filter_chars(_config.get('ldap', 'uid_attr'))
        encoding = _config.get('ldap', 'encoding')
        manager = _config.get('ldap', 'manager').encode(encoding)
        manager_password = _config.get(
            'ldap', 'manager_password', raw=True).encode(encoding)
        if manager:
            _logger.debug("Attempting authenticated bind as manager to %s",
                          manager)
            self.ldap.simple_bind_s(manager, manager_password)
        filter_ = "(%s=%s)" % (uid_attr, escape_filter_chars(
            self.username.encode(encoding)))
        result = self.ldap.search_s(_config.get('ldap', 'basedn'),
                                    ldap.SCOPE_SUBTREE, filter_)
        if not result or not result[0] or not result[0][0]:
            raise UserNotFound(filter_)

        user_dn, attrs = result[0]
        if uid_attr in attrs:
            uid = attrs[uid_attr][0].decode(encoding)
        else:
            uid = self.username
        return user_dn.decode(encoding), uid
ldapinfo.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, uri, base, filter_pattern, scope=SCOPE_SUBTREE,
                 tls=False, user="", passwd="", attr=None, attrsonly=False):
        UserInfo.__init__(self)
        self.ldapuri = uri
        self.base = base
        self.filter_pattern = filter_pattern
        self.scope = scope
        self.tls = tls
        self.attr = attr
        self.attrsonly = attrsonly
        self.ld = ldap.initialize(uri)
        self.ld.protocol_version = ldap.VERSION3
        self.ld.simple_bind_s(user, passwd)
authentication.py 文件源码 项目:dila 作者: socialwifi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_user_records(username):
    with initialize_connection() as connection:
        connection.simple_bind_s(config.LDAP_BIND_DN, config.LDAP_BIND_PASSWORD)
        query = config.LDAP_USER_OBJECT_FILTER % {'user': ldap_filter.escape_filter_chars(username)}
        records = connection.search_s(config.LDAP_USER_BASE_DN, ldap.SCOPE_SUBTREE, query)
    return records
authentication.py 文件源码 项目:dila 作者: socialwifi 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def check_group_membership(username, group=None):
    with initialize_connection() as connection:
        connection.simple_bind_s(config.LDAP_BIND_DN, config.LDAP_BIND_PASSWORD)
        if group is None:
            query = config.LDAP_GROUP_ALL
        else:
            query = config.LDAP_GROUP_OBJECT_FILTER % {'group': ldap_filter.escape_filter_chars(group)}
        results = connection.search_s(config.LDAP_GROUP_BASE_DN, ldap.SCOPE_SUBTREE, query)
        members = itertools.chain.from_iterable([result[1]['memberUid']for result in results])
        encoding = config.LDAP_ENCODING
        members = [member.decode(encoding) for member in members]
        is_member = username in members
        return is_member
vio_ldap.py 文件源码 项目:ansible-modules-extras-gpl3 作者: vmware 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def ldap_search(module, conn, dn, search_filter, ldap_attrs):

    try:
        search = conn.search_s(dn, ldap.SCOPE_SUBTREE, search_filter, ldap_attrs)
    except ldap.LDAPError as e:
        fail_msg = "LDAP Error Searching: {}".format(ldap_errors(e))
        module.fail_json(msg=fail_msg)

    return search
FreeIPAServer.py 文件源码 项目:ipa_check_consistency 作者: peterpakos 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _search(self, base, fltr, attrs=None, scope=ldap.SCOPE_SUBTREE):
        results = self._conn.search_s(base, scope, fltr, attrs)
        return results


问题


面经


文章

微信
公众号

扫码关注公众号