python类SCOPE_ONELEVEL的实例源码

FreeIPAServer.py 文件源码 项目:ipa_check_consistency 作者: peterpakos 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _replication_agreements(self):
        self._log.debug('Checking for replication agreements...')
        msg = []
        healthy = True
        suffix = self._base_dn.replace('=', '\\3D').replace(',', '\\2C')
        results = self._search(
            'cn=replica,cn=%s,cn=mapping tree,cn=config' % suffix,
            '(objectClass=*)',
            ['nsDS5ReplicaHost', 'nsds5replicaLastUpdateStatus'],
            scope=ldap.SCOPE_ONELEVEL
        )

        for result in results:
            dn, attrs = result
            host = attrs['nsDS5ReplicaHost'][0].decode('utf-8')
            host = host.partition('.')[0]
            status = attrs['nsds5replicaLastUpdateStatus'][0].decode('utf-8')
            status = status.replace('Error ', '').partition(' ')[0].strip('()')
            if status != '0':
                healthy = False
            msg.append('%s %s' % (host, status))

        return '\n'.join(msg), healthy
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def check_userfree(self, uid):
        """Check if a username is free.
        If username is already used or is an LDAP group, an
        RBFatalError is raised. If the username is in the additional
        reserved LDAP tree, an RBWarningError is raised and checked if
        it is to be overridden. """
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'uid=%s' % uid)
        if res:
            raise RBFatalError(
                "Username '%s' is already taken by %s account (%s)" %
                (uid, res[0][1]['objectClass'][0].decode(),
                 res[0][1]['cn'][0].decode()))
        res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
                                 'cn=%s' % uid)
        if res:
            raise RBFatalError("Username '%s' is reserved (LDAP Group)" % uid)
        res = self.ldap.search_s(rbconfig.ldap_reserved_tree,
                                 ldap.SCOPE_ONELEVEL, 'uid=%s' % uid)
        if res:
            self.rberror(
                RBWarningError("Username '%s' is reserved (%s)" % (uid, res[0][
                    1]['description'][0].decode())))
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def list_pre_sync(self):
        """Return dictionary of all users for useradm pre_sync() dump."""

        res = self.ldap.search_s(
            rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL,
            'objectClass=posixAccount', ('uid', 'homeDirectory',
                                         'objectClass'))
        tmp = {}
        for data in res:
            for i in data['objectClass']:
                i = i.decode()
                if i in rbconfig.usertypes:
                    break
            else:
                raise RBFatalError(
                    "Unknown usertype for user '%s'" % data['uid'][0])

            tmp[data['uid'][0]] = {
                'homeDirectory': data['homeDirectory'][0],
                'usertype': data['uid'][0]
            }
        return tmp
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def uidNumber_findmax(self):
        """Return highest uidNumber found in LDAP accounts tree.
        This is only used to create the uidNumber file, the
        uidNumber_readnext() function should be used for getting the
        next available uidNumber."""

        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL,
                                 'objectClass=posixAccount', ('uidNumber', ))

        maxuid = -1
        for i in res:
            tmp = int(i[1]['uidNumber'][0])
            if tmp > maxuid:
                maxuid = tmp

        return maxuid
cpldap.py 文件源码 项目:auth-tool 作者: luciddg 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _search(self, filterstr, attrlist=None):
        """
        A wrapper for the `LDAPObject.search_s` functionality.

        Perform an LDAP search operation, starting at the configured base DN.
        The filterstr argument is a string representation of the filter to
        apply in the search.    The retrieved attributes can be limited with the
        attrlist parameter.  If attrlist is None, all the attributes of each
        entry are returned.
        """
        with self._ldap_connection() as ldap_cxn:
            results = ldap_cxn.search_s(self.base_dn, ldap.SCOPE_ONELEVEL, filterstr, attrlist)
        return results
FreeIPAServer.py 文件源码 项目:ipa_check_consistency 作者: peterpakos 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _count_hbac_rules(self):
        self._log.debug('Counting HBAC rules...')
        results = self._search(
            'cn=hbac,%s' % self._base_dn,
            '(ipaUniqueID=*)',
            scope=ldap.SCOPE_ONELEVEL
        )
        return len(results)
FreeIPAServer.py 文件源码 项目:ipa_check_consistency 作者: peterpakos 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _count_sudo_rules(self):
        self._log.debug('Counting SUDO rules...')
        results = self._search(
            'cn=sudorules,cn=sudo,%s' % self._base_dn,
            '(ipaUniqueID=*)',
            scope=ldap.SCOPE_ONELEVEL
        )
        return len(results)
FreeIPAServer.py 文件源码 项目:ipa_check_consistency 作者: peterpakos 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _count_dns_zones(self):
        self._log.debug('Counting DNS zones...')
        results = self._search(
            'cn=dns,%s' % self._base_dn,
            '(|(objectClass=idnszone)(objectClass=idnsforwardzone))',
            scope=ldap.SCOPE_ONELEVEL
        )
        return len(results)
FreeIPAServer.py 文件源码 项目:ipa_check_consistency 作者: peterpakos 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _count_certificates(self):
        self._log.debug('Counting certificates...')
        try:
            results = self._search(
                'ou=certificateRepository,ou=ca,o=ipaca',
                '(certStatus=*)',
                scope=ldap.SCOPE_ONELEVEL
            )
        except ldap.NO_SUCH_OBJECT:
            return 'N/A'
        n = len(results)
        return n
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def check_user_byname(self, uid):
        """Raise RBFatalError if given username does not exist in user
        database."""
        if not self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                  ldap.SCOPE_ONELEVEL, 'uid=%s' % uid):
            raise RBFatalError("User '%s' does not exist" % uid)
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def check_user_byid(self, user_id):
        """Raise RBFatalError if given id does not belong to a user in
        user database."""
        if not self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                  ldap.SCOPE_ONELEVEL, 'id=%s' % user_id):
            raise RBFatalError("User with id '%s' does not exist" % user_id)
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_group_byname(self, group):
        """Raise RBFatalError if given group does not exist in group
        database."""
        if not self.ldap.search_s(rbconfig.ldap_group_tree,
                                  ldap.SCOPE_ONELEVEL, 'cn=%s' % group):
            raise RBFatalError("Group '%s' does not exist" % group)
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def check_group_byid(self, gid):
        """Raise RBFatalError if given id does not belong to a group in
        group database."""
        if not self.ldap.search_s(rbconfig.ldap_group_tree,
                                  ldap.SCOPE_ONELEVEL, 'gidNumber=%s' % gid):
            raise RBFatalError("Group with id '%s' does not exist" % gid)

    # ------------------------------------------------------------------- #
    # INFORMATION RETRIEVAL METHODS                                       #
    # ------------------------------------------------------------------- #

    # fixme still needed ?

    # def get_usertype_byname(self, uid):
    #     """Return usertype for username in user database. Raise
    #     RBFatalError if user does not exist."""
    #     res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
    #                              ldap.SCOPE_ONELEVEL, 'uid=%s' % usr.uid,
    #                              ('objectClass', ))
    #     if res:
    #         for i in res[0][1]['objectClass']:
    #             if i in rbconfig.usertypes:
    #                 return i
    #             else:
    #                raise RBFatalError("Unknown usertype for user '%s'" % uid)
    #         else:
    #             raise RBFatalError("User '%s' does not exist" % uid)
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_user_byid(self, usr):
        """Populate RBUser object with data from user with given id in
        user database. Raise RBFatalError if user does not exist."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'id=%s' % usr.id)
        if res:
            self.set_user(usr, res[0])
        else:
            raise RBFatalError("User with id '%s' does not exist" % usr.id)
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_dummyid(self, usr):
        """Set usr.id to unique 'dummy' DCU ID number."""
        raise RBFatalError('NOT YET IMPLEMENTED')
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL,
                                 '(&(id>=10000000)(id<20000000))"' % (usr.uid))
        if res:
            usr.id = int(res[0][1]['id'][0]) + 1
        else:
            usr.id = 10000000
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_gid_byname(self, group):
        """Get gid for given group name.
        Raise RBFatalError if given name does not belong to a group in
        group database."""
        res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
                                 'cn=%s' % group)
        if res:
            return int(res[0][1]['gidNumber'][0])
        else:
            raise RBFatalError("Group '%s' does not exist" % group)
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_group_byid(self, gid):
        """Get group name for given group ID.
        Raise RBFatalError if given id does not belong to a group in
        group database."""
        res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
                                 'gidNumber=%s' % gid)
        if res:
            return res[0][1]['cn'][0]
        else:
            raise RBFatalError("Group with id '%s' does not exist" % gid)
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def list_paid_newbies(self):
        """Return list of all paid newbie usernames."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL,
                                 '(&(yearsPaid>=1)(newbie=TRUE))', ('uid', ))
        return [data['uid'][0] for dn, data in res]
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def list_paid_non_newbies(self):
        """Return list of all paid renewal (non-newbie) usernames."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL,
                                 '(&(yearsPaid>=1)(newbie=FALSE))', ('uid', ))
        return [data['uid'][0] for dn, data in res]
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def list_non_newbies(self):
        """Return list of all non newbie usernames."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'newbie=FALSE',
                                 ('uid', ))
        return [data['uid'][0] for dn, data in res]
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def list_newbies(self):
        """Return list of all newbie usernames."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'newbie=TRUE', ('uid', ))
        return [data['uid'][0] for dn, data in res]
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def list_groups(self):
        """Return list of all groups."""
        res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
                                 'objectClass=posixGroup', ('cn', ))
        return [data['cn'][0] for dn, data in res]
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def list_reserved_static(self):
        """Return list of all static reserved names."""
        res = self.ldap.search_s(
            rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL,
            '(&(objectClass=reserved)(flag=static))', ('uid', ))
        return [data['uid'][0] for dn, data in res]
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def list_reserved_dynamic(self):
        """Return list of all dynamic reserved names."""
        res = self.ldap.search_s(
            rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL,
            '(&(objectClass=reserved)(!(flag=static)))', ('uid', ))
        return [data['uid'][0] for dn, data in res]
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def list_unpaid(self):
        """Return list of all non-renewed users."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'yearsPaid<=0',
                                 ('uid', ))
        return [data['uid'][0] for dn, data in res]
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def list_unpaid_normal(self):
        """Return list of all normal non-renewed users."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'yearsPaid=0', ('uid', ))
        return [data['uid'][0] for dn, data in res]
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def list_unpaid_grace(self):
        """Return list of all grace non-renewed users."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'yearsPaid<=-1',
                                 ('uid', ))
        return [data['uid'][0] for dn, data in res]
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def dict_reserved_desc(self):
        """Return dictionary of all reserved entries with their
        description."""
        res = self.ldap.search_s(rbconfig.ldap_reserved_tree,
                                 ldap.SCOPE_ONELEVEL, 'objectClass=reserved',
                                 ('uid', 'description'))
        tmp = {}
        for data in res:
            tmp[data['uid'][0]] = data['description'][0]
        return tmp
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def dict_reserved_static(self):
        """Return dictionary of all static reserved entries with their
        description."""
        res = self.ldap.search_s(
            rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL,
            '(&(objectClass=reserved)(flag=static))', ('uid', 'description'))
        tmp = {}
        for data in res:
            tmp[data['uid'][0]] = data['description'][0]
        return tmp

    # -------------------------------- #
    # METHODS RETURNING SEARCH RESULTS #
    # -------------------------------- #
LdapChecker.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def execute(self):
        args = self.args
        # we can connect in 2 ways. By hostname/ip (and portnumber)
        # or by ldap-uri
        if "url" in args and ldapurl.isLDAPUrl(args["url"]):
            conn = ldap.initialize(args["url"])
        else:
            ip, port = self.get_address()
            conn = ldap.initialize("ldap://%s:%s" % (ip, port))
        username = args.get("username", "")
        password = args.get("password", "")
        conn.simple_bind(username, password)

        try:
            self._set_version(args, conn)
        except ValueError:
            return Event.DOWN, "unsupported protocol version"

        base = args.get("base", "dc=example,dc=org")
        if base == "cn=monitor":
            my_res = conn.search_st(base, ldap.SCOPE_BASE,
                                    timeout=self.timeout)
            versionstr = str(my_res[0][-1]['description'][0])
            self.version = versionstr
            return Event.UP, versionstr
        scope = args.get("scope", "SUBTREE").upper()
        if scope == "BASE":
            scope = ldap.SCOPE_BASE
        elif scope == "ONELEVEL":
            scope = ldap.SCOPE_ONELEVEL
        else:
            scope = ldap.SCOPE_SUBTREE
        filtr = args.get("filter", "objectClass=*")
        try:
            conn.search_ext_s(base, scope, filterstr=filtr,
                              timeout=self.timeout)
            # pylint: disable=W0703
        except Exception as err:
            return (Event.DOWN,
                    "Failed ldapSearch on %s for %s: %s" % (
                        self.get_address(), filtr, str(err)))

        conn.unbind()

        return Event.UP, "Ok"


问题


面经


文章

微信
公众号

扫码关注公众号