def _header_to_id(self, header):
"""Convert a Content-ID header value to an id.
Presumes the Content-ID header conforms to the format that _id_to_header()
returns.
Args:
header: string, Content-ID header value.
Returns:
The extracted id value.
Raises:
BatchError if the header is not in the expected format.
"""
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return unquote(id_)
python类unquote()的实例源码
def _header_to_id(self, header):
"""Convert a Content-ID header value to an id.
Presumes the Content-ID header conforms to the format that _id_to_header()
returns.
Args:
header: string, Content-ID header value.
Returns:
The extracted id value.
Raises:
BatchError if the header is not in the expected format.
"""
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return unquote(id_)
def match(self, request):
"""Matches this route against the current request.
:raises:
``exc.HTTPMethodNotAllowed`` if the route defines :attr:`methods`
and the request method isn't allowed.
.. seealso:: :meth:`BaseRoute.match`.
"""
match = self.regex.match(unquote(request.path))
if not match or self.schemes and request.scheme not in self.schemes:
return None
if self.methods and request.method not in self.methods:
# This will be caught by the router, so routes with different
# methods can be tried.
raise exc.HTTPMethodNotAllowed()
args, kwargs = _get_route_variables(match, self.defaults.copy())
return self, args, kwargs
def decode(data_url):
"""
Decode DataURL data
"""
metadata, data = data_url.rsplit(',', 1)
_, metadata = metadata.split('data:', 1)
parts = metadata.split(';')
if parts[-1] == 'base64':
data = b64decode(data)
else:
data = unquote(data)
for part in parts:
if part.startswith("charset="):
data = data.decode(part[8:])
return data
def _header_to_id(self, header):
"""Convert a Content-ID header value to an id.
Presumes the Content-ID header conforms to the format that _id_to_header()
returns.
Args:
header: string, Content-ID header value.
Returns:
The extracted id value.
Raises:
BatchError if the header is not in the expected format.
"""
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return unquote(id_)
def _header_to_id(self, header):
"""Convert a Content-ID header value to an id.
Presumes the Content-ID header conforms to the format that _id_to_header()
returns.
Args:
header: string, Content-ID header value.
Returns:
The extracted id value.
Raises:
BatchError if the header is not in the expected format.
"""
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return unquote(id_)
def verify_merge_update(self, updates, result):
g = self.gitlab
for (key, value) in six.iteritems(updates):
if key == 'private_token':
continue
if key == 'state_event':
key = 'state'
value = self.STATE_EVENT2MERGE_STATE[updates['state_event']]
result_value = result.get(key) or ''
if value != result_value:
url = (g['host'] + "/" + parse.unquote(g['repo']) + "/" +
"merge_requests/" + str(result['iid']))
raise ValueError("{url}: {key} value expected to be {value}"
" but is {result}".format(
url=url,
key=key,
value=value,
result=result_value))
# Local Variables:
# compile-command: "cd .. ; virtualenv/bin/tox -e flake8"
# End:
def post_account(self, headers, query_string=None, data=None,
response_dict=None):
if query_string == 'bulk-delete':
resp = {'Response Status': '200 OK',
'Response Body': '',
'Number Deleted': 0,
'Number Not Found': 0}
if response_dict is not None:
response_dict['status'] = 200
if data:
for path in data.splitlines():
try:
__, container, obj = (unquote(path.decode('utf8'))
.split('/', 2))
del self.kvs[container][obj]
resp['Number Deleted'] += 1
except KeyError:
resp['Number Not Found'] += 1
return {}, json.dumps(resp).encode('utf-8')
if response_dict is not None:
response_dict['status'] = 204
return {}, None
def _header_to_id(self, header):
"""Convert a Content-ID header value to an id.
Presumes the Content-ID header conforms to the format that _id_to_header()
returns.
Args:
header: string, Content-ID header value.
Returns:
The extracted id value.
Raises:
BatchError if the header is not in the expected format.
"""
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return unquote(id_)
def _header_to_id(self, header):
"""Convert a Content-ID header value to an id.
Presumes the Content-ID header conforms to the format that _id_to_header()
returns.
Args:
header: string, Content-ID header value.
Returns:
The extracted id value.
Raises:
BatchError if the header is not in the expected format.
"""
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return unquote(id_)
def _header_to_id(self, header):
"""Convert a Content-ID header value to an id.
Presumes the Content-ID header conforms to the format that _id_to_header()
returns.
Args:
header: string, Content-ID header value.
Returns:
The extracted id value.
Raises:
BatchError if the header is not in the expected format.
"""
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return unquote(id_)
def reduce_url(url):
"""
Get path to object on bucket from presigned url
:param url: presigned url
:type url: str
:return: path to object on bucket
:rtype str
"""
if (url is None) or (parse.urlparse(url).path == url):
return url
upr = parse.urlparse(url)
if not upr.scheme:
path = url
else:
path = parse.unquote(upr.path)
return path
def choose_name(fp, mfst):
"""Find the package name for this manifest. If it's defined in a set
action in the manifest, use that. Otherwise use the basename of the
path to the manifest as the name. If a proper package fmri is found,
then also return a PkgFmri object so that we can track which packages
are being processed.
'fp' is the path to the file for the manifest.
'mfst' is the Manifest object."""
if mfst is None:
return unquote(os.path.basename(fp)), None
name = mfst.get("pkg.fmri", mfst.get("fmri", None))
if name is not None:
try:
pfmri = fmri.PkgFmri(name)
except fmri.IllegalFmri:
pfmri = None
return name, pfmri
return unquote(os.path.basename(fp)), None
def _header_to_id(self, header):
"""Convert a Content-ID header value to an id.
Presumes the Content-ID header conforms to the format that _id_to_header()
returns.
Args:
header: string, Content-ID header value.
Returns:
The extracted id value.
Raises:
BatchError if the header is not in the expected format.
"""
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return unquote(id_)
def _parse_root_device_hints(node):
"""Convert string with hints to dict. """
root_device = node.properties.get('root_device')
if not root_device:
return {}
try:
parsed_hints = irlib_utils.parse_root_device_hints(root_device)
except ValueError as e:
raise exception.InvalidParameterValue(
_('Failed to validate the root device hints for node %(node)s. '
'Error: %(error)s') % {'node': node.uuid, 'error': e})
root_device_hints = {}
advanced = {}
for hint, value in parsed_hints.items():
if isinstance(value, six.string_types):
if value.startswith('== '):
root_device_hints[hint] = int(value[3:])
elif value.startswith('s== '):
root_device_hints[hint] = urlparse.unquote(value[4:])
else:
advanced[hint] = value
else:
root_device_hints[hint] = value
if advanced:
raise exception.InvalidParameterValue(
_('Ansible-deploy does not support advanced root device hints '
'based on oslo.utils operators. '
'Present advanced hints for node %(node)s are %(hints)s.') % {
'node': node.uuid, 'hints': advanced})
return root_device_hints
def build_path(operation, ns):
"""
Build a path URI for an operation.
"""
try:
return ns.url_for(operation, _external=False)
except BuildError as error:
# we are missing some URI path parameters
uri_templates = {
argument: "{{{}}}".format(argument)
for argument in error.suggested.arguments
}
# flask will sometimes try to quote '{' and '}' characters
return unquote(ns.url_for(operation, _external=False, **uri_templates))
def unquote_keys(data):
"""Restores initial view of 'quoted' keys in dictionary data
:param data: is a dictionary
:return: data with restored keys if they were 'quoted'.
"""
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, dict):
unquote_keys(value)
if key.startswith('%24'):
k = parse.unquote(key)
data[k] = data.pop(key)
return data
def match(self, request):
"""Matches this route against the current request.
.. seealso:: :meth:`BaseRoute.match`.
"""
match = self.regex.match(unquote(request.path))
if match:
return self, match.groups(), {}
def unquote(value, *args, **kwargs):
"""Decodes a value using urllib.unquote or urllib.parse.unquote(PY3)
and deserializes it from JSON.
Parameters and return value are the same from :func:`decode`.
"""
return decode(parse.unquote(value), *args, **kwargs)
def match(self, request):
if not self.regex.match(parse.unquote(request.path)):
return None
return _match_routes(self.get_match_children, request)
def reconnect(self):
"""Reconnect to rabbitmq server"""
parsed = urlparse.urlparse(self.amqp_url)
port = parsed.port or 5672
self.connection = amqp.Connection(host="%s:%s" % (parsed.hostname, port),
userid=parsed.username or 'guest',
password=parsed.password or 'guest',
virtual_host=unquote(
parsed.path.lstrip('/') or '%2F'))
self.channel = self.connection.channel()
try:
self.channel.queue_declare(self.name)
except amqp.exceptions.PreconditionFailed:
pass
#self.channel.queue_purge(self.name)
def parse_arguments_from_fields(self, for_fields, relative_path):
if not for_fields:
return {}
assert isinstance(relative_path, six.text_type) # Scaffolding for Py3 port
matched_arguments = self.match(relative_path).match.groupdict()
fields = self.get_temp_url_argument_field_index(for_fields)
raw_input_values = dict(
[(self.convert_str_to_identifier(key), urllib_parse.unquote(value or ''))
for key, value in matched_arguments.items()])
fields.accept_input(raw_input_values)
return fields.as_kwargs()
def get_session_key(cls):
context = ExecutionContext.get_context()
try:
raw_cookie = context.request.cookies[context.config.web.session_key_name]
return urllib_parse.unquote(raw_cookie)
except KeyError:
return None
def path_decode(bs):
return _decode(unquote(bs.encode('ascii') if PY2 else bs))
def parse_data(self, data):
headers = {}
data = unquote(data)
data = data.strip().splitlines()
last_key = None
value = ''
for line in data:
if ': ' in line:
key, value = line.split(': ', 1)
last_key = key
else:
key = last_key
value += '\n' + line
headers[key.strip()] = value.strip()
self.headers = headers
def decode(txt):
"""Turns a coded string by code() into a NameID class instance.
:param txt: The coded string
"""
_nid = NameID()
for part in txt.split(","):
if part.find("=") != -1:
i, val = part.split("=")
try:
setattr(_nid, ATTR[int(i)], unquote(val))
except:
pass
return _nid
def qs_as_dict(url):
qs_str = urlparse(url).query
return dict((x[0], unquote(x[1])) for x in [x.split("=") for x in qs_str.split("&")])
def _process_response(self, response):
# Removed due to IronPython Bug
# https://github.com/IronLanguages/ironpython2/issues/242
# if response.status_code == 422:
# raise HTTPError('Unprocessable Entity for url(
# decoded): {}'.format(unquote(response.url)))
response.raise_for_status()
return response.json()
def extract(test=False):
text = requests.get(URL).text
text = text.split('g_img={url:')[1]
text = text.split(',')[0].replace("'", "").replace('"', '').replace("\\", "")
img_url = urljoin(URL, text).replace(" ", "")
fname = img_url.split('/')[-1]
fname = unquote(fname).split('/')[-1]
if not test:
print('# img_url:', img_url)
print('# fname:', fname)
save_name = '{date}-{fname}'.format(date=get_date_from_year_to_day(),
fname=fname)
return (img_url, save_name)
def close(self, add_to_catalog=True):
"""Closes an open transaction, returning the published FMRI for
the corresponding package, and its current state in the catalog.
"""
def split_trans_id(tid):
m = re.match("(\d+)_(.*)", tid)
return m.group(1), unquote(m.group(2))
trans_id = self.get_basename()
pkg_fmri = split_trans_id(trans_id)[1]
# set package state to SUBMITTED
pkg_state = "SUBMITTED"
# set state to PUBLISHED
if self.append_trans:
pkg_fmri, pkg_state = self.accept_append(add_to_catalog)
else:
pkg_fmri, pkg_state = self.accept_publish(
add_to_catalog)
# Discard the in-flight transaction data.
try:
shutil.rmtree(self.dir)
except EnvironmentError as e:
# Ensure that the error goes to stderr, and then drive
# on as the actual package was published.
misc.emsg(e)
return (pkg_fmri, pkg_state)