python类unquote()的实例源码

repository.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __fmri_from_path(pkgpath, ver):
                """Helper method that takes the full path to the package
                directory and the name of the manifest file, and returns an FMRI
                constructed from the information in those components."""

                v = pkg.version.Version(unquote(ver), None)
                f = fmri.PkgFmri(unquote(os.path.basename(pkgpath)))
                f.version = v
                return f
api.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __parse_v_1(line, pub, v):
                """This function parses the string returned by a version 1
                search server and puts it into the expected format of
                (query_number, publisher, (version, return_type, (results)))
                If it receives a line it can't parse, it raises a
                ServerReturnError."""

                fields = line.split(None, 2)
                if len(fields) != 3:
                        raise apx.ServerReturnError(line)
                try:
                        return_type = int(fields[1])
                        query_num = int(fields[0])
                except ValueError:
                        raise apx.ServerReturnError(line)
                if return_type == Query.RETURN_ACTIONS:
                        subfields = fields[2].split(None, 2)
                        pfmri = fmri.PkgFmri(subfields[0])
                        return pfmri, (query_num, pub, (v, return_type,
                            (pfmri, unquote(subfields[1]),
                            subfields[2])))
                elif return_type == Query.RETURN_PACKAGES:
                        pfmri = fmri.PkgFmri(fields[2])
                        return pfmri, (query_num, pub, (v, return_type, pfmri))
                else:
                        raise apx.ServerReturnError(line)
search_storage.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def read_dict_file(self):
                """Reads in a dictionary stored in with an entity
                and its number on each line.
                """
                self._dict.clear()
                for line in self._file_handle:
                        token, offset = line.split(" ")
                        if token[0] == "1":
                                token = unquote(token[1:])
                        else:
                                token = token[1:]
                        offset = int(offset)
                        self._dict[token] = offset
                IndexStoreBase.read_dict_file(self)
transaction.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, origin_url, create_repo=False, pkg_name=None,
            repo_props=EmptyDict, trans_id=None, xport=None, pub=None,
            progtrack=None):

                scheme, netloc, path, params, query, fragment = \
                    urlparse(origin_url, "http", allow_fragments=0)

                self.pkg_name = pkg_name
                self.trans_id = trans_id
                self.scheme = scheme
                if scheme == "file":
                        path = unquote(path)
                self.path = path
                self.progtrack = progtrack
                self.transport = xport
                self.publisher = pub
                self.__local = False
                self.__uploaded = 0
                self.__uploads = {}
                self.__transactions = {}
                self._tmpdir = None
                self._append_mode = False
                self._upload_mode = None

                if scheme == "file":
                        self.__local = True
                        self.create_file_repo(repo_props=repo_props,
                            create_repo=create_repo)
                elif scheme != "file" and create_repo:
                        raise UnsupportedRepoTypeOperationError("create_repo",
                            type=scheme)
indexer.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _build_version(vers):
                """ Private method for building versions from a string. """

                return pkg.version.Version(unquote(vers), None)
an_manifest.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def count_manifest(mg, d):
        try:
                manifest_by_date[d.date().isoformat()] += 1
        except KeyError:
                manifest_by_date[d.date().isoformat()] = 1
        try:
                manifest_by_ip[mg["ip"]] += 1
        except KeyError:
                manifest_by_ip[mg["ip"]] = 1

        pm = pkg_pat.search(mg["uri"])
        if pm != None and mg["response"] == "200":
                pg = pm.groupdict()

                try:
                        manifest_by_pkg[unquote(pg["stem"])] += 1
                except KeyError:
                        manifest_by_pkg[unquote(pg["stem"])] = 1

                try:
                        manifest_by_ver_pkg[unquote(pg["stem"] + "@" + pg["version"])] += 1
                except KeyError:
                        manifest_by_ver_pkg[unquote(pg["stem"] + "@" + pg["version"])] = 1

        agent = pkg_agent_pat.search(mg["agent"])
        if agent == None:
                return

        ag = agent.groupdict()
        try:
                manifest_by_arch[ag["arch"]] += 1
        except KeyError:
                manifest_by_arch[ag["arch"]] = 1
parse.py 文件源码 项目:pyfilesystem2 作者: PyFilesystem 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def parse_fs_url(fs_url):
    """Parse a Filesystem URL and return a `ParseResult`.

    Arguments:
        fs_url (str): A filesystem URL.

    Returns:
        ~fs.opener.parse.ParseResult: a parse result instance.

    Raises:
        ~fs.errors.ParseError: if the FS URL is not valid.

    """
    match = _RE_FS_URL.match(fs_url)
    if match is None:
        raise ParseError('{!r} is not a fs2 url'.format(fs_url))

    fs_name, credentials, url1, url2, path = match.groups()
    if credentials:
        username, _, password = credentials.partition(':')
        username = unquote(username)
        password = unquote(password)
        url = url1
    else:
        username = None
        password = None
        url = url2
    url, has_qs, _params = url.partition('?')
    resource = unquote(url)
    if has_qs:
        params = parse_qs(_params, keep_blank_values=True)
        params = {k:v[0] for k, v in params.items()}
    else:
        params = {}
    return ParseResult(
        fs_name,
        username,
        password,
        resource,
        params,
        path
    )
tpda.py 文件源码 项目:dnsknife 作者: Gandi 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def trusted_params(uri):
    """Walk through an URI, lookup the set of DNSKEYs for the origin
    third party provider domain, validate URI signature against the
    found keys. If valid, returns the trustable URI - otherwise raise
    an exception.

    /!\ User MUST use the returned URI, as signature validation is
    only done on everything *before* the URI.
    """
    # Truncate the signature= part
    try:
        uri, sig = uri.split('&signature=')
        sig = parse.unquote(sig)
    except ValueError:
        raise exceptions.IncompleteURI

    pr = parse.urlparse(uri)
    if not pr.query:
        raise exceptions.IncompleteURI

    expires = _qsl_get_one(pr.query, 'expires')
    if (datetime.datetime.utcnow() >
        datetime.datetime.strptime(expires, '%Y%m%d%H%M%S')):
        raise exceptions.Expired

    source = _qsl_get_one(pr.query, 'source')

    txtl = Checker(source, dnssec=True).txt('_tpda')
    if not txtl:
        raise exceptions.NoTPDA

    keys = [RSA.importKey(base64.b64decode(txt.encode('ascii')))
            for txt in txtl.split('\n')]

    digest = SHA256.new()
    digest.update(uri.encode('ascii'))

    for key in keys:
        signer = PKCS1_v1_5.new(key)
        if signer.verify(digest, base64.b64decode(sig)):
            return params(uri)

    raise exceptions.NoSignatureMatch
main.py 文件源码 项目:github2gitlab 作者: chauffer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def sync(self):
        pull_f2merge_f = {
            'state': 'state',
            'body': 'description',
            'title': 'title',
        }
        for number in sorted(self.pull_requests.keys()):
            pull = self.pull_requests[number]
            merge = None
            if number in self.pull2merge:
                merge = self.pull2merge[number]
            else:
                source_branch = 'pull/' + number + '/head'
                target_branch = pull['base']['ref']
                if (self.rev_parse(pull, source_branch) and
                        self.rev_parse(pull, target_branch)):
                    data = {'title': pull['title'],
                            'source_branch': source_branch,
                            'target_branch': target_branch}
                    if pull['body']:
                        data['description'] = pull['body'][:DESCRIPTION_MAX]
                    merge = self.create_merge_request(data)

            if merge:
                updates = {}
                for (pull_field, merge_field) in six.iteritems(pull_f2merge_f):
                    if not self.field_equal(pull,
                                            pull_field,
                                            pull[pull_field],
                                            merge,
                                            merge_field,
                                            merge[merge_field]):
                        (key, value) = self.field_update(pull,
                                                         pull_field,
                                                         pull[pull_field],
                                                         merge,
                                                         merge_field,
                                                         merge[merge_field])
                        updates[key] = value
                if updates:
                    self.update_merge_request(merge, updates)
                else:
                    log.debug("https://github.com/" +
                              self.github['repo'] + "/" +
                              "pull/" + number + " == " +
                              self.gitlab['host'] + "/" +
                              parse.unquote(self.gitlab['repo']) + "/" +
                              "merge_requests/" + str(merge['iid']))
transaction.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reopen(self, rstore, trans_dir):
                """The reopen() method is invoked by the repository as needed to
                load Transaction data."""

                self.rstore = rstore
                try:
                        open_time_str, self.esc_pkg_name = \
                            os.path.basename(trans_dir).split("_", 1)
                except ValueError:
                        raise TransactionUnknownIDError(os.path.basename(
                            trans_dir))

                self.open_time = \
                    datetime.datetime.utcfromtimestamp(int(open_time_str))
                self.pkg_name = unquote(self.esc_pkg_name)

                # This conversion should always work, because we encoded the
                # client release on the initial open of the transaction.
                self.fmri = fmri.PkgFmri(self.pkg_name, None)

                self.dir = os.path.join(rstore.trans_root, self.get_basename())

                if not os.path.exists(self.dir):
                        raise TransactionUnknownIDError(self.get_basename())

                tmode = "rb"
                if not rstore.read_only:
                        # The mode is important especially when dealing with
                        # NFS because of problems with opening a file as
                        # read/write or readonly multiple times.
                        tmode += "+"

                # Find out if the package is renamed or obsolete.
                try:
                        tfpath = os.path.join(self.dir, "manifest")
                        tfile = open(tfpath, tmode)
                except IOError as e:
                        if e.errno == errno.ENOENT:
                                return
                        raise
                m = pkg.manifest.Manifest()
                # If tfile is a StreamingFileObj obj, its read()
                # methods will return bytes. We need str for
                # manifest and here's an earlisest point that
                # we can convert it to str.
                m.set_content(content=misc.force_str(tfile.read()))
                tfile.close()
                if os.path.exists(os.path.join(self.dir, "append")):
                        self.append_trans = True
                self.obsolete = m.getbool("pkg.obsolete", "false")
                self.renamed = m.getbool("pkg.renamed", "false")
                self.types_found = set((
                    action.name for action in m.gen_actions()
                ))
                self.has_reqdeps = any(
                    a.attrs["type"] == "require"
                    for a in m.gen_actions_by_type("depend")
                )
search_storage.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def parse_main_dict_line(line):
                """Parses one line of a main dictionary file.
                Changes to this function must be paired with changes to
                write_main_dict_line below.

                This should produce the same data structure that
                _write_main_dict_line in indexer.py creates to write out each
                line.
                """

                split_chars = IndexStoreMainDict.sep_chars
                line = line.rstrip('\n')
                tmp = line.split(split_chars[0])
                tok = unquote(tmp[0])
                atl = tmp[1:]
                res = []
                for ati in atl:
                        tmp = ati.split(split_chars[1])
                        action_type = tmp[0]
                        stl = tmp[1:]
                        at_res = []
                        for sti in stl:
                                tmp = sti.split(split_chars[2])
                                subtype = tmp[0]
                                fvl = tmp[1:]
                                st_res = []
                                for fvi in fvl:
                                        tmp = fvi.split(split_chars[3])
                                        full_value = unquote(tmp[0])
                                        pfl = tmp[1:]
                                        fv_res = []
                                        for pfi in pfl:
                                                tmp = pfi.split(split_chars[4])
                                                pfmri_index = int(tmp[0])
                                                offsets = [
                                                    int(t) for t in tmp[1:]
                                                ]
                                                fv_res.append(
                                                    (pfmri_index, offsets))
                                        st_res.append((full_value, fv_res))
                                at_res.append((subtype, st_res))
                        res.append((action_type, at_res))
                return tok, res


问题


面经


文章

微信
公众号

扫码关注公众号