python类SCOPE_SUBTREE的实例源码

pyAuthenticationByLDAP.py 文件源码 项目:LinuxBashShellScriptForOps 作者: DingGuodong 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, username, password):
        ldap_host = "192.168.78.8"
        ldap_port = "389"
        ldaps_port = "636"
        ldap_enable_ldaps = False
        self.ldap_base_dn = "DC=example,DC=com,DC=cn"  # example.com.cn

        self.ldap_user = username
        self.ldap_password = password

        if ldap_enable_ldaps is True:
            self.uri = "ldaps://" + ldap_host + ":" + ldaps_port
        else:
            self.uri = "ldap://" + ldap_host + ":" + ldap_port

        self.is_active = False
        self.user_data = None

        self.conn = ldap.initialize(self.uri)

        try:
            self.conn.set_option(ldap.OPT_REFERRALS, 0)  # this option is required in Windows Server 2012
            self.conn.simple_bind_s(who=self.ldap_user, cred=self.ldap_password)
        except ldap.INVALID_CREDENTIALS:
            raise Exception("Invalid credentials")
        except ldap.SERVER_DOWN:
            raise Exception("Can't contact LDAP server")

        self.is_active = True
        self.user_data = self.conn.search_s(self.ldap_base_dn, ldap.SCOPE_SUBTREE,
                                            'userPrincipalName=' + self.ldap_user)
        # self.user_data = self.conn.search_s(self.ldap_base_dn, ldap.SCOPE_SUBTREE)
        self.conn.unbind()
ldapdns.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_tuple_for_domain(cls, lobj, domain):
        entry = lobj.search_s(CONF.ldap_dns_base_dn, ldap.SCOPE_SUBTREE,
                              '(associatedDomain=%s)' % utils.utf8(domain))
        if not entry:
            return None
        if len(entry) > 1:
            LOG.warning(_LW("Found multiple matches for domain "
                            "%(domain)s.\n%(entry)s"),
                        domain, entry)
        return entry[0]
ldapdns.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_all_domains(cls, lobj):
        entries = lobj.search_s(CONF.ldap_dns_base_dn,
                                ldap.SCOPE_SUBTREE, '(sOARecord=*)')
        domains = []
        for entry in entries:
            domain = entry[1].get('associatedDomain')
            if domain:
                domains.append(domain[0])
        return domains
ldapdns.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def delete(self):
        """Delete the domain that this entry refers to."""
        entries = self.lobj.search_s(self.dn,
                                     ldap.SCOPE_SUBTREE,
                                     '(aRecord=*)')
        for entry in entries:
            self.lobj.delete_s(entry[0])

        self.lobj.delete_s(self.dn)
ldapdns.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def subentry_with_name(self, name):
        entry = self.lobj.search_s(self.dn, ldap.SCOPE_SUBTREE,
                                   '(associatedDomain=%s.%s)' %
                                   (utils.utf8(name),
                                    utils.utf8(self.qualified_domain)))
        if entry:
            return HostEntry(self, entry[0])
        else:
            return None
ldapdns.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def subentries_with_ip(self, ip):
        entries = self.lobj.search_s(self.dn, ldap.SCOPE_SUBTREE,
                                     '(aRecord=%s)' % utils.utf8(ip))
        objs = []
        for entry in entries:
            if 'associatedDomain' in entry[1]:
                objs.append(HostEntry(self, entry))

        return objs
windapsearch.py 文件源码 项目:windapsearch 作者: ropnop 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getAllUsers(self, attrs=''):
        if not attrs:
            attrs = ['cn', 'userPrincipalName']

        objectFilter = '(objectCategory=user)'
        base_dn = self.domainBase
        try:
            rawUsers = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
        except LDAPError, e:
            print "[!] Error retrieving users"
            print "[!] {}".format(e)
            sys.exit(1)

        return (self.get_search_results(rawUsers), attrs)
windapsearch.py 文件源码 项目:windapsearch 作者: ropnop 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def getAllGroups(self,attrs=''):
        if not attrs:
            attrs = ['distinguishedName', 'cn']

        objectFilter = '(objectCategory=group)'
        base_dn = self.domainBase
        try:
            rawGroups = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
        except LDAPError, e:
            print "[!] Error retrieving groups"
            print "[!] {}".format(e)
            sys.exit(1)

        return (self.get_search_results(rawGroups), attrs)
windapsearch.py 文件源码 项目:windapsearch 作者: ropnop 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def doFuzzySearch(self, searchTerm, objectCategory=''):
        if objectCategory:
            objectFilter = '(&(objectCategory={})(anr={}))'.format(objectCategory, searchTerm)
        else:
            objectFilter = '(anr={})'.format(searchTerm)
        attrs = ['dn']
        base_dn = self.domainBase
        try:
            rawResults = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
        except LDAPError, e:
            print "[!] Error retrieving results"
            print "[!] {}".format(e)
            sys.exit(1)
        return self.get_search_results(rawResults)
windapsearch.py 文件源码 项目:windapsearch 作者: ropnop 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def doCustomSearch(self, base, objectFilter, attrs):
        try:
            rawResults = self.do_ldap_query(base, ldap.SCOPE_SUBTREE, objectFilter, attrs)
        except LDAPError, e:
            "print [!] Error doing search"
            "print [!] {}".format(e)
            sys.exit(1)

        return self.get_search_results(rawResults)
windapsearch.py 文件源码 项目:windapsearch 作者: ropnop 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getAllComputers(self, attrs=''):
        if not attrs:
            attrs = ['cn', 'dNSHostName', 'operatingSystem', 'operatingSystemVersion', 'operatingSystemServicePack']

        objectFilter = '(objectClass=Computer)'
        base_dn = self.domainBase

        try:
            rawComputers = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
        except LDAPError, e:
            print "[!] Error retrieving computers"
            print "[!] {}".format(e)
            sys.exit(1)

        return (self.get_search_results(rawComputers), attrs)
syncldap.py 文件源码 项目:django-adldap-sync 作者: marchete 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ldap_search(self, filter, attributes, incremental, incremental_filter):
        """
        Query the configured LDAP server with the provided search filter and
        attribute list.
        """
        for uri in self.conf_LDAP_SYNC_BIND_URI:
            #Read record of this uri
            if (self.working_uri == uri):
                adldap_sync = self.working_adldap_sync
                created = False
            else:
                adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri)

            if ((adldap_sync.syncs_to_full > 0) and incremental):
                filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT))
                logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use)
            else:
                filter_to_use = filter

            ldap.set_option(ldap.OPT_REFERRALS, 0)
            #ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
            l = PagedLDAPObject(uri)
            l.protocol_version = 3

            if (uri.startswith('ldaps:')):
                l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
                l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
                l.set_option(ldap.OPT_X_TLS_DEMAND, True)
            else:
                l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER)
                l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
                l.set_option(ldap.OPT_X_TLS_DEMAND, False)
            try:
                l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS)
            except ldap.LDAPError as e:
                logger.error("Error connecting to LDAP server %s : %s" % (uri, e))
                continue

            results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None)
            l.unbind_s()
            if (self.working_uri is None):
                self.working_uri = uri
                self.conf_LDAP_SYNC_BIND_URI.insert(0, uri)
                self.working_adldap_sync = adldap_sync

            return (uri, results)  # Return both the LDAP server URI used and the request. This is for incremental sync purposes
        #if not connected correctly, raise error
        raise
ldapobject.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def search_ext(self,base,scope,filterstr='(objectClass=*)',attrlist=None,attrsonly=0,serverctrls=None,clientctrls=None,timeout=-1,sizelimit=0):
    """
    search(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0]]]) -> int
    search_s(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0]]])
    search_st(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,timeout=-1]]]])
    search_ext(base,scope,[,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,serverctrls=None [,clientctrls=None [,timeout=-1 [,sizelimit=0]]]]]]])
    search_ext_s(base,scope,[,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,serverctrls=None [,clientctrls=None [,timeout=-1 [,sizelimit=0]]]]]]])

        Perform an LDAP search operation, with base as the DN of
        the entry at which to start the search, scope being one of
        SCOPE_BASE (to search the object itself), SCOPE_ONELEVEL
        (to search the object's immediate children), or SCOPE_SUBTREE
        (to search the object and all its descendants).

        filter is a string representation of the filter to
        apply in the search (see RFC 4515).

        Each result tuple is of the form (dn,entry), where dn is a
        string containing the DN (distinguished name) of the entry, and
        entry is a dictionary containing the attributes.
        Attributes types are used as string dictionary keys and attribute
        values are stored in a list as dictionary value.

        The DN in dn is extracted using the underlying ldap_get_dn(),
        which may raise an exception of the DN is malformed.

        If attrsonly is non-zero, the values of attrs will be
        meaningless (they are not transmitted in the result).

        The retrieved attributes can be limited with the attrlist
        parameter.  If attrlist is None, all the attributes of each
        entry are returned.

        serverctrls=None

        clientctrls=None

        The synchronous form with timeout, search_st() or search_ext_s(),
        will block for at most timeout seconds (or indefinitely if
        timeout is negative). A TIMEOUT exception is raised if no result is
        received within the time.

        The amount of search results retrieved can be limited with the
        sizelimit parameter if non-zero.
    """
    base, filterstr = self._unbytesify_values(base, filterstr)
    if attrlist is not None:
      attrlist = tuple(self._unbytesify_values(*attrlist))
    return self._ldap_call(
      self._l.search_ext,
      base,scope,filterstr,
      attrlist,attrsonly,
      RequestControlTuples(serverctrls),
      RequestControlTuples(clientctrls),
      timeout,sizelimit,
    )
LdapChecker.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def execute(self):
        args = self.args
        # we can connect in 2 ways. By hostname/ip (and portnumber)
        # or by ldap-uri
        if "url" in args and ldapurl.isLDAPUrl(args["url"]):
            conn = ldap.initialize(args["url"])
        else:
            ip, port = self.get_address()
            conn = ldap.initialize("ldap://%s:%s" % (ip, port))
        username = args.get("username", "")
        password = args.get("password", "")
        conn.simple_bind(username, password)

        try:
            self._set_version(args, conn)
        except ValueError:
            return Event.DOWN, "unsupported protocol version"

        base = args.get("base", "dc=example,dc=org")
        if base == "cn=monitor":
            my_res = conn.search_st(base, ldap.SCOPE_BASE,
                                    timeout=self.timeout)
            versionstr = str(my_res[0][-1]['description'][0])
            self.version = versionstr
            return Event.UP, versionstr
        scope = args.get("scope", "SUBTREE").upper()
        if scope == "BASE":
            scope = ldap.SCOPE_BASE
        elif scope == "ONELEVEL":
            scope = ldap.SCOPE_ONELEVEL
        else:
            scope = ldap.SCOPE_SUBTREE
        filtr = args.get("filter", "objectClass=*")
        try:
            conn.search_ext_s(base, scope, filterstr=filtr,
                              timeout=self.timeout)
            # pylint: disable=W0703
        except Exception as err:
            return (Event.DOWN,
                    "Failed ldapSearch on %s for %s: %s" % (
                        self.get_address(), filtr, str(err)))

        conn.unbind()

        return Event.UP, "Ok"


问题


面经


文章

微信
公众号

扫码关注公众号