def __fmri_from_path(pkgpath, ver):
"""Helper method that takes the full path to the package
directory and the name of the manifest file, and returns an FMRI
constructed from the information in those components."""
v = pkg.version.Version(unquote(ver), None)
f = fmri.PkgFmri(unquote(os.path.basename(pkgpath)))
f.version = v
return f
python类unquote()的实例源码
def __parse_v_1(line, pub, v):
"""This function parses the string returned by a version 1
search server and puts it into the expected format of
(query_number, publisher, (version, return_type, (results)))
If it receives a line it can't parse, it raises a
ServerReturnError."""
fields = line.split(None, 2)
if len(fields) != 3:
raise apx.ServerReturnError(line)
try:
return_type = int(fields[1])
query_num = int(fields[0])
except ValueError:
raise apx.ServerReturnError(line)
if return_type == Query.RETURN_ACTIONS:
subfields = fields[2].split(None, 2)
pfmri = fmri.PkgFmri(subfields[0])
return pfmri, (query_num, pub, (v, return_type,
(pfmri, unquote(subfields[1]),
subfields[2])))
elif return_type == Query.RETURN_PACKAGES:
pfmri = fmri.PkgFmri(fields[2])
return pfmri, (query_num, pub, (v, return_type, pfmri))
else:
raise apx.ServerReturnError(line)
def read_dict_file(self):
"""Reads in a dictionary stored in with an entity
and its number on each line.
"""
self._dict.clear()
for line in self._file_handle:
token, offset = line.split(" ")
if token[0] == "1":
token = unquote(token[1:])
else:
token = token[1:]
offset = int(offset)
self._dict[token] = offset
IndexStoreBase.read_dict_file(self)
def __init__(self, origin_url, create_repo=False, pkg_name=None,
repo_props=EmptyDict, trans_id=None, xport=None, pub=None,
progtrack=None):
scheme, netloc, path, params, query, fragment = \
urlparse(origin_url, "http", allow_fragments=0)
self.pkg_name = pkg_name
self.trans_id = trans_id
self.scheme = scheme
if scheme == "file":
path = unquote(path)
self.path = path
self.progtrack = progtrack
self.transport = xport
self.publisher = pub
self.__local = False
self.__uploaded = 0
self.__uploads = {}
self.__transactions = {}
self._tmpdir = None
self._append_mode = False
self._upload_mode = None
if scheme == "file":
self.__local = True
self.create_file_repo(repo_props=repo_props,
create_repo=create_repo)
elif scheme != "file" and create_repo:
raise UnsupportedRepoTypeOperationError("create_repo",
type=scheme)
def _build_version(vers):
""" Private method for building versions from a string. """
return pkg.version.Version(unquote(vers), None)
def count_manifest(mg, d):
try:
manifest_by_date[d.date().isoformat()] += 1
except KeyError:
manifest_by_date[d.date().isoformat()] = 1
try:
manifest_by_ip[mg["ip"]] += 1
except KeyError:
manifest_by_ip[mg["ip"]] = 1
pm = pkg_pat.search(mg["uri"])
if pm != None and mg["response"] == "200":
pg = pm.groupdict()
try:
manifest_by_pkg[unquote(pg["stem"])] += 1
except KeyError:
manifest_by_pkg[unquote(pg["stem"])] = 1
try:
manifest_by_ver_pkg[unquote(pg["stem"] + "@" + pg["version"])] += 1
except KeyError:
manifest_by_ver_pkg[unquote(pg["stem"] + "@" + pg["version"])] = 1
agent = pkg_agent_pat.search(mg["agent"])
if agent == None:
return
ag = agent.groupdict()
try:
manifest_by_arch[ag["arch"]] += 1
except KeyError:
manifest_by_arch[ag["arch"]] = 1
def parse_fs_url(fs_url):
"""Parse a Filesystem URL and return a `ParseResult`.
Arguments:
fs_url (str): A filesystem URL.
Returns:
~fs.opener.parse.ParseResult: a parse result instance.
Raises:
~fs.errors.ParseError: if the FS URL is not valid.
"""
match = _RE_FS_URL.match(fs_url)
if match is None:
raise ParseError('{!r} is not a fs2 url'.format(fs_url))
fs_name, credentials, url1, url2, path = match.groups()
if credentials:
username, _, password = credentials.partition(':')
username = unquote(username)
password = unquote(password)
url = url1
else:
username = None
password = None
url = url2
url, has_qs, _params = url.partition('?')
resource = unquote(url)
if has_qs:
params = parse_qs(_params, keep_blank_values=True)
params = {k:v[0] for k, v in params.items()}
else:
params = {}
return ParseResult(
fs_name,
username,
password,
resource,
params,
path
)
def trusted_params(uri):
"""Walk through an URI, lookup the set of DNSKEYs for the origin
third party provider domain, validate URI signature against the
found keys. If valid, returns the trustable URI - otherwise raise
an exception.
/!\ User MUST use the returned URI, as signature validation is
only done on everything *before* the URI.
"""
# Truncate the signature= part
try:
uri, sig = uri.split('&signature=')
sig = parse.unquote(sig)
except ValueError:
raise exceptions.IncompleteURI
pr = parse.urlparse(uri)
if not pr.query:
raise exceptions.IncompleteURI
expires = _qsl_get_one(pr.query, 'expires')
if (datetime.datetime.utcnow() >
datetime.datetime.strptime(expires, '%Y%m%d%H%M%S')):
raise exceptions.Expired
source = _qsl_get_one(pr.query, 'source')
txtl = Checker(source, dnssec=True).txt('_tpda')
if not txtl:
raise exceptions.NoTPDA
keys = [RSA.importKey(base64.b64decode(txt.encode('ascii')))
for txt in txtl.split('\n')]
digest = SHA256.new()
digest.update(uri.encode('ascii'))
for key in keys:
signer = PKCS1_v1_5.new(key)
if signer.verify(digest, base64.b64decode(sig)):
return params(uri)
raise exceptions.NoSignatureMatch
def sync(self):
pull_f2merge_f = {
'state': 'state',
'body': 'description',
'title': 'title',
}
for number in sorted(self.pull_requests.keys()):
pull = self.pull_requests[number]
merge = None
if number in self.pull2merge:
merge = self.pull2merge[number]
else:
source_branch = 'pull/' + number + '/head'
target_branch = pull['base']['ref']
if (self.rev_parse(pull, source_branch) and
self.rev_parse(pull, target_branch)):
data = {'title': pull['title'],
'source_branch': source_branch,
'target_branch': target_branch}
if pull['body']:
data['description'] = pull['body'][:DESCRIPTION_MAX]
merge = self.create_merge_request(data)
if merge:
updates = {}
for (pull_field, merge_field) in six.iteritems(pull_f2merge_f):
if not self.field_equal(pull,
pull_field,
pull[pull_field],
merge,
merge_field,
merge[merge_field]):
(key, value) = self.field_update(pull,
pull_field,
pull[pull_field],
merge,
merge_field,
merge[merge_field])
updates[key] = value
if updates:
self.update_merge_request(merge, updates)
else:
log.debug("https://github.com/" +
self.github['repo'] + "/" +
"pull/" + number + " == " +
self.gitlab['host'] + "/" +
parse.unquote(self.gitlab['repo']) + "/" +
"merge_requests/" + str(merge['iid']))
def reopen(self, rstore, trans_dir):
"""The reopen() method is invoked by the repository as needed to
load Transaction data."""
self.rstore = rstore
try:
open_time_str, self.esc_pkg_name = \
os.path.basename(trans_dir).split("_", 1)
except ValueError:
raise TransactionUnknownIDError(os.path.basename(
trans_dir))
self.open_time = \
datetime.datetime.utcfromtimestamp(int(open_time_str))
self.pkg_name = unquote(self.esc_pkg_name)
# This conversion should always work, because we encoded the
# client release on the initial open of the transaction.
self.fmri = fmri.PkgFmri(self.pkg_name, None)
self.dir = os.path.join(rstore.trans_root, self.get_basename())
if not os.path.exists(self.dir):
raise TransactionUnknownIDError(self.get_basename())
tmode = "rb"
if not rstore.read_only:
# The mode is important especially when dealing with
# NFS because of problems with opening a file as
# read/write or readonly multiple times.
tmode += "+"
# Find out if the package is renamed or obsolete.
try:
tfpath = os.path.join(self.dir, "manifest")
tfile = open(tfpath, tmode)
except IOError as e:
if e.errno == errno.ENOENT:
return
raise
m = pkg.manifest.Manifest()
# If tfile is a StreamingFileObj obj, its read()
# methods will return bytes. We need str for
# manifest and here's an earlisest point that
# we can convert it to str.
m.set_content(content=misc.force_str(tfile.read()))
tfile.close()
if os.path.exists(os.path.join(self.dir, "append")):
self.append_trans = True
self.obsolete = m.getbool("pkg.obsolete", "false")
self.renamed = m.getbool("pkg.renamed", "false")
self.types_found = set((
action.name for action in m.gen_actions()
))
self.has_reqdeps = any(
a.attrs["type"] == "require"
for a in m.gen_actions_by_type("depend")
)
def parse_main_dict_line(line):
"""Parses one line of a main dictionary file.
Changes to this function must be paired with changes to
write_main_dict_line below.
This should produce the same data structure that
_write_main_dict_line in indexer.py creates to write out each
line.
"""
split_chars = IndexStoreMainDict.sep_chars
line = line.rstrip('\n')
tmp = line.split(split_chars[0])
tok = unquote(tmp[0])
atl = tmp[1:]
res = []
for ati in atl:
tmp = ati.split(split_chars[1])
action_type = tmp[0]
stl = tmp[1:]
at_res = []
for sti in stl:
tmp = sti.split(split_chars[2])
subtype = tmp[0]
fvl = tmp[1:]
st_res = []
for fvi in fvl:
tmp = fvi.split(split_chars[3])
full_value = unquote(tmp[0])
pfl = tmp[1:]
fv_res = []
for pfi in pfl:
tmp = pfi.split(split_chars[4])
pfmri_index = int(tmp[0])
offsets = [
int(t) for t in tmp[1:]
]
fv_res.append(
(pfmri_index, offsets))
st_res.append((full_value, fv_res))
at_res.append((subtype, st_res))
res.append((action_type, at_res))
return tok, res