def sort_url_by_query_keys(url):
"""A helper function which sorts the keys of the query string of a url.
For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
prevent non-deterministic ordering of the query string causing
problems with unit tests.
:param url: url which will be ordered by query keys
:returns url: url with ordered query keys
"""
parsed = urlparse.urlparse(url)
queries = urlparse.parse_qsl(parsed.query, True)
sorted_query = sorted(queries, key=lambda x: x[0])
encoded_sorted_query = urlparse.urlencode(sorted_query, True)
url_parts = (parsed.scheme, parsed.netloc, parsed.path,
parsed.params, encoded_sorted_query,
parsed.fragment)
return urlparse.urlunparse(url_parts)
python类parse_qsl()的实例源码
def params(arg):
"""
Parse `arg` and returns a dict describing the TPDA
arguments.
Resolution order:
- `arg` is a string is parsed as an URL
- `arg` has an items() method and is parsed
(works for multidicts, ..)
- `arg` is a iterable is parsed as a
((name, value), (name, value), ..)
"""
if isinstance(arg, str):
pr = parse.urlparse(arg)
if not pr.query:
raise exceptions.IncompleteURI
return Params(parse.parse_qsl(pr.query)).params()
elif hasattr(arg, 'items'):
return Params(arg.items()).params()
elif hasattr(arg, '__iter__'):
return Params(arg).params()
def test_logout(self):
end_session_endpoint = 'https://provider.example.com/end_session'
post_logout_uri = 'https://client.example.com/post_logout'
authn = OIDCAuthentication(self.app,
provider_configuration_info={'issuer': ISSUER,
'end_session_endpoint': end_session_endpoint},
client_registration_info={'client_id': 'foo',
'post_logout_redirect_uris': [post_logout_uri]})
id_token = IdToken(**{'sub': 'sub1', 'nonce': 'nonce'})
with self.app.test_request_context('/logout'):
flask.session['access_token'] = 'abcde'
flask.session['userinfo'] = {'foo': 'bar', 'abc': 'xyz'}
flask.session['id_token'] = id_token.to_dict()
flask.session['id_token_jwt'] = id_token.to_jwt()
end_session_redirect = authn._logout()
assert all(k not in flask.session for k in ['access_token', 'userinfo', 'id_token', 'id_token_jwt'])
assert end_session_redirect.status_code == 303
assert end_session_redirect.headers['Location'].startswith(end_session_endpoint)
parsed_request = dict(parse_qsl(urlparse(end_session_redirect.headers['Location']).query))
assert parsed_request['state'] == flask.session['end_session_state']
assert parsed_request['id_token_hint'] == id_token.to_jwt()
assert parsed_request['post_logout_redirect_uri'] == post_logout_uri
def params(self, collapse=True):
"""Extracts the query parameters from the split urls components.
This method will provide back as a dictionary the query parameter
names and values that were provided in the url.
:param collapse: Boolean, turn on or off collapsing of query values
with the same name. Since a url can contain the same query parameter
name with different values it may or may not be useful for users to
care that this has happened. This parameter when True uses the
last value that was given for a given name, while if False it will
retain all values provided by associating the query parameter name with
a list of values instead of a single (non-list) value.
"""
if self.query:
if collapse:
return dict(parse.parse_qsl(self.query))
else:
params = {}
for (key, value) in parse.parse_qsl(self.query):
if key in params:
if isinstance(params[key], list):
params[key].append(value)
else:
params[key] = [params[key], value]
else:
params[key] = value
return params
else:
return {}
def request_token(self):
resp = self.oauth_request(self.request_token_url, 'GET')
oauth_token = dict(parse.parse_qsl(resp.read().decode()))
self.authorize_url = self.authorize_url % (oauth_token['oauth_token'], self.callback)
self.oauth_token = {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']}
return self.oauth_token
def access_token(self, oauth_token=None, oauth_verifier=None):
self.oauth_token = oauth_token or self.oauth_token
args = {'oauth_verifier': oauth_verifier} if oauth_verifier else {} # if callback is oob
resp = self.oauth_request(self.access_token_url, 'GET', args)
oauth_token = dict(parse.parse_qsl(resp.read().decode()))
self.oauth_token = {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']}
return self.oauth_token
def xauth(self, username, password):
args = {
'x_auth_username': username,
'x_auth_password': password,
'x_auth_mode': 'client_auth'
}
resp = self.oauth_request(self.access_token_url, 'GET', args)
oauth_token = dict(parse.parse_qsl(resp.read().decode()))
return {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']}
def __init__(self, url):
parts = parse.urlparse(url)
_query = frozenset(parse.parse_qsl(parts.query))
_path = parse.unquote_plus(parts.path)
parts = parts._replace(query=_query, path=_path)
self.parts = parts
def __init__(self, url):
parts = parse.urlparse(url)
_query = frozenset(parse.parse_qsl(parts.query))
_path = parse.unquote_plus(parts.path)
parts = parts._replace(query=_query, path=_path)
self.parts = parts
def _cs_request_with_retries(self, url, method, **kwargs):
# Check that certain things are called correctly
if method in ['GET', 'DELETE']:
assert 'body' not in kwargs
elif method == 'PUT':
assert 'body' in kwargs
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
# Note the call
self.callstack.append((method, url, kwargs.get('body', None)))
status, headers, body = getattr(self, callback)(**kwargs)
r = utils.TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
return r, body
#
# Quotas
#
def _qsl_get_one(qstring, param):
"""Return one unique param from query string"""
args = parse.parse_qsl(qstring)
sigvals = [arg[1] for arg in args if arg[0] == param]
if len(sigvals) != 1:
raise exceptions.InvalidArgument(param)
return sigvals[0]
# Uses RDAP
def get_s3_uri(self, uri):
parsed = urlparse(uri)
client = self.session_factory().client('s3')
params = dict(
Bucket=parsed.netloc,
Key=parsed.path[1:])
if parsed.query:
params.update(dict(parse_qsl(parsed.query)))
result = client.get_object(**params)
return result['Body'].read()
def get(self, url, query, cache):
payloads_file = (self.tmpdir + "/" +
hashlib.sha1(url.encode('utf-8')).hexdigest() +
".json")
if (not cache or not os.access(payloads_file, 0) or
time.time() - os.stat(payloads_file).st_mtime > 24 * 60 * 60):
payloads = []
next_query = query
while next_query:
log.debug(str(next_query))
result = requests.get(url, params=next_query)
payloads += result.json()
next_query = None
for link in result.headers.get('Link', '').split(','):
if 'rel="next"' in link:
m = re.search('<(.*)>', link)
if m:
parsed_url = parse.urlparse(m.group(1))
# append query in case it was not preserved
# (gitlab has that problem)
next_query = query
next_query.update(
dict(parse.parse_qsl(parsed_url.query))
)
if cache:
with open(payloads_file, 'w') as f:
json.dump(payloads, f)
else:
with open(payloads_file, 'r') as f:
payloads = json.load(f)
return payloads
def test_authenticatate_with_extra_request_parameters(self):
extra_params = {"foo": "bar", "abc": "xyz"}
authn = OIDCAuthentication(self.app, provider_configuration_info={'issuer': ISSUER},
client_registration_info={'client_id': 'foo'},
extra_request_args=extra_params)
with self.app.test_request_context('/'):
a = authn._authenticate()
request_params = dict(parse_qsl(urlparse(a.location).query))
assert set(extra_params.items()).issubset(set(request_params.items()))
def test_configure_for_plugin(self):
expected_url = "{}/consumers/joe/auth-key" . format (mock_kong_admin_url)
responses.add(responses.POST, expected_url, status=201)
data = { "key": "123" }
response = self.api.configure_for_plugin("joe", "auth-key", data)
assert response.status_code == 201
body = parse_qs(responses.calls[0].request.body)
body_exactly = parse_qsl(responses.calls[0].request.body)
assert body['key'][0] == "123", \
"Expect correct. data to be sent. Got: {}" . format (body_exactly)
def get_auth_params(self, request, action):
settings = self.get_settings()
ret = dict(settings.get('AUTH_PARAMS', {}))
dynamic_auth_params = request.GET.get('auth_params')
if dynamic_auth_params:
ret.update(dict(parse_qsl(dynamic_auth_params)))
return ret
def expand_redirect_url(starting_url, key, bucket):
""" Add key and bucket parameters to starting URL query string. """
parsed = urlparse.urlparse(starting_url)
query = collections.OrderedDict(urlparse.parse_qsl(parsed.query))
query.update([('key', key), ('bucket', bucket)])
redirect_url = urlparse.urlunparse((
parsed.scheme, parsed.netloc, parsed.path,
parsed.params, urlparse.urlencode(query), None))
return redirect_url
def request_access_token(self, *args, **kwargs):
response = self.request(*args, **kwargs)
return dict(parse_qsl(response.text))
def __call__(self, env, start_response):
status = '200 OK'
params = dict(parse_qsl(env.get('QUERY_STRING')))
ws = env.get('wsgi.websocket')
if ws and not params.get('ignore_ws'):
msg = 'WS Request Url: ' + env.get('REQUEST_URI', '')
msg += ' Echo: ' + ws.receive()
ws.send(msg)
return []
result = 'Requested Url: ' + env.get('REQUEST_URI', '')
if env['REQUEST_METHOD'] == 'POST':
result += ' Post Data: ' + env['wsgi.input'].read(int(env['CONTENT_LENGTH'])).decode('utf-8')
if params.get('addproxyhost') == 'true':
result += ' Proxy Host: ' + env.get('wsgiprox.proxy_host', '')
result = result.encode('iso-8859-1')
if params.get('chunked') == 'true':
headers = []
else:
headers = [('Content-Length', str(len(result)))]
write = start_response(status, headers)
if params.get('write') == 'true':
write(result)
return iter([])
else:
return iter([result])
# ============================================================================
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
self.last_request_id = headers.get('x-openstack-request-id',
'req-test')
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
def temp_url(self, lifetime=300, method='GET', inline=True, filename=None):
"""Obtains a temporary URL to an object.
Args:
lifetime (int): The time (in seconds) the temporary
URL will be valid
method (str): The HTTP method that can be used on
the temporary URL
inline (bool, default True): If False, URL will have a
Content-Disposition header that causes browser to download as
attachment.
filename (str, optional): A urlencoded filename to use for
attachment, otherwise defaults to object name
"""
global_options = settings.get()['swift']
auth_url = global_options.get('auth_url')
temp_url_key = global_options.get('temp_url_key')
if not self.resource:
raise ValueError('can only create temporary URL on object')
if not temp_url_key:
raise ValueError(
'a temporary url key must be set with settings.update '
'or by setting the OS_TEMP_URL_KEY environment variable')
if not auth_url:
raise ValueError(
'an auth url must be set with settings.update '
'or by setting the OS_AUTH_URL environment variable')
obj_path = '/v1/%s' % self[len(self.drive):]
# Generate the temp url using swifts helper. Note that this method is ONLY
# useful for obtaining the temp_url_sig and the temp_url_expires parameters.
# These parameters will be used to construct a properly-escaped temp url
obj_url = generate_temp_url(obj_path, lifetime, temp_url_key, method)
query_begin = obj_url.rfind('temp_url_sig', 0, len(obj_url))
obj_url_query = obj_url[query_begin:]
obj_url_query = dict(parse.parse_qsl(obj_url_query))
query = ['temp_url_sig=%s' % obj_url_query['temp_url_sig'],
'temp_url_expires=%s' % obj_url_query['temp_url_expires']]
if inline:
query.append('inline')
if filename:
query.append('filename=%s' % parse.quote(filename))
auth_url_parts = parse.urlparse(auth_url)
return parse.urlunparse((auth_url_parts.scheme,
auth_url_parts.netloc,
parse.quote(obj_path),
auth_url_parts.params,
'&'.join(query),
auth_url_parts.fragment))
def encode_uri(uri):
split = list(urlsplit(uri))
split[1] = split[1].encode('idna').decode('ascii')
split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii')
query = list((q, quote_plus(v.encode('utf-8')))
for (q, v) in parse_qsl(split[3]))
split[3] = urlencode(query).decode('ascii')
return urlunsplit(split)
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
self.last_request_id = headers.get('x-openstack-request-id',
'req-test')
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})