def do_GET(self):
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
parts = urlparse(self.path)
query = parse_qs(parts.query)
result = {}
for k, v in query.items():
if len(v) == 1:
result[k] = v[0]
elif len(v) == 0:
result[k] = None
else:
result[k] = v
self.server.callback_result = result
self.wfile.write(b'You can close this window now')
python类parse_qs()的实例源码
def _parse_connection_url(url):
"""Parse connection parameters from a database url.
.. note::
HBase Thrift does not support authentication and there is no
database name, so we are not looking for these in the url.
"""
opts = {}
result = netutils.urlsplit(url)
opts['table_prefix'] = urlparse.parse_qs(
result.query).get('table_prefix', [None])[0]
opts['table_prefix_separator'] = urlparse.parse_qs(
result.query).get('table_prefix_separator', ['_'])[0]
opts['dbtype'] = result.scheme
if ':' in result.netloc:
opts['host'], port = result.netloc.split(':')
else:
opts['host'] = result.netloc
port = 9090
opts['port'] = port and int(port) or 9090
return opts
def get_body(self, request):
params = urlparse.parse_qs(request.query_string)
if len(request.body) == 0:
LOG.debug("Empty body provided in request")
return None, params
try:
content_type = request.get_content_type()
except exception.InvalidContentType:
LOG.debug("Unrecognized Content-Type provided in request")
return None, ''
if not content_type:
LOG.debug("No Content-Type provided in request")
return None, ''
return content_type, request.body
def facebook_validation_function(url):
try:
url_parts = _get_url_parts(url)
# check if acceptable domain
domain = url_parts[1]
if not (domain == 'facebook.com' or domain.endswith('.facebook.com')):
return None
path = _get_initial_path(url_parts)
# old style numeric profiles
if path == "profile.php": # ex. https://www.facebook.com/profile.php?id=100010279981469
path = parse_qs(url_parts[3]).get('id')[0]
if path == 'people': # ex. https://www.facebook.com/people/John-Doe/100013326345115
path = url_parts[2].strip('/').split('/')[2].lower()
# TODO: validate against allowed username characteristics
# https://github.com/project-callisto/callisto-core/issues/181
if not path or path == "" or path.endswith(
'.php') or path in generic_fb_urls:
return None
else:
return path
except ValidationError:
return None
def test_authorize_view(self):
with self.app.test_client() as client:
response = client.get('/oauth2authorize')
location = response.headers['Location']
q = urlparse.parse_qs(location.split('?', 1)[1])
state = json.loads(q['state'][0])
self.assertIn(oauth2client.GOOGLE_AUTH_URI, location)
self.assertNotIn(self.oauth2.client_secret, location)
self.assertIn(self.oauth2.client_id, q['client_id'])
self.assertEqual(
flask.session['google_oauth2_csrf_token'], state['csrf_token'])
self.assertEqual(state['return_url'], '/')
with self.app.test_client() as client:
response = client.get('/oauth2authorize?return_url=/test')
location = response.headers['Location']
q = urlparse.parse_qs(location.split('?', 1)[1])
state = json.loads(q['state'][0])
self.assertEqual(state['return_url'], '/test')
with self.app.test_client() as client:
response = client.get('/oauth2authorize?extra_param=test')
location = response.headers['Location']
self.assertIn('extra_param=test', location)
def test_authorize_view(self):
with self.app.test_client() as client:
response = client.get('/oauth2authorize')
location = response.headers['Location']
q = urlparse.parse_qs(location.split('?', 1)[1])
state = json.loads(q['state'][0])
self.assertIn(oauth2client.GOOGLE_AUTH_URI, location)
self.assertNotIn(self.oauth2.client_secret, location)
self.assertIn(self.oauth2.client_id, q['client_id'])
self.assertEqual(
flask.session['google_oauth2_csrf_token'], state['csrf_token'])
self.assertEqual(state['return_url'], '/')
with self.app.test_client() as client:
response = client.get('/oauth2authorize?return_url=/test')
location = response.headers['Location']
q = urlparse.parse_qs(location.split('?', 1)[1])
state = json.loads(q['state'][0])
self.assertEqual(state['return_url'], '/test')
with self.app.test_client() as client:
response = client.get('/oauth2authorize?extra_param=test')
location = response.headers['Location']
self.assertIn('extra_param=test', location)
def _pagination(self, collection, path, **params):
if params.get('page_reverse', False):
linkrel = 'previous'
else:
linkrel = 'next'
next = True
while next:
res = self.get(path, params=params)
yield res
next = False
try:
for link in res['%s_links' % collection]:
if link['rel'] == linkrel:
query_str = urlparse.urlparse(link['href']).query
params = urlparse.parse_qs(query_str)
next = True
break
except KeyError:
break
def _pagination(self, collection, path, **params):
if params.get('page_reverse', False):
linkrel = 'previous'
else:
linkrel = 'next'
next = True
while next:
res = self.get(path, params=params)
yield res
next = False
try:
for link in res['%s_links' % collection]:
if link['rel'] == linkrel:
query_str = urlparse.urlparse(link['href']).query
params = urlparse.parse_qs(query_str)
next = True
break
except KeyError:
break
def _pagination(self, collection, path, **params):
if params.get('page_reverse', False):
linkrel = 'previous'
else:
linkrel = 'next'
next = True
while next:
res = self.get(path, params=params)
yield res
next = False
try:
for link in res['%s_links' % collection]:
if link['rel'] == linkrel:
query_str = urlparse.urlparse(link['href']).query
params = urlparse.parse_qs(query_str)
next = True
break
except KeyError:
break
def test_authorize_view(self):
with self.app.test_client() as client:
response = client.get('/oauth2authorize')
location = response.headers['Location']
q = urlparse.parse_qs(location.split('?', 1)[1])
state = json.loads(q['state'][0])
self.assertIn(GOOGLE_AUTH_URI, location)
self.assertNotIn(self.oauth2.client_secret, location)
self.assertIn(self.oauth2.client_id, q['client_id'])
self.assertEqual(
flask.session['google_oauth2_csrf_token'], state['csrf_token'])
self.assertEqual(state['return_url'], '/')
with self.app.test_client() as client:
response = client.get('/oauth2authorize?return_url=/test')
location = response.headers['Location']
q = urlparse.parse_qs(location.split('?', 1)[1])
state = json.loads(q['state'][0])
self.assertEqual(state['return_url'], '/test')
with self.app.test_client() as client:
response = client.get('/oauth2authorize?extra_param=test')
location = response.headers['Location']
self.assertIn('extra_param=test', location)
def __init__(self, raw, errors='replace'):
"""Takes a string of type application/x-www-form-urlencoded.
"""
# urllib needs bytestrings in py2 and unicode strings in py3
raw_str = raw.encode('ascii') if PY2 else raw
self.decoded = _decode(unquote_plus(raw_str), errors=errors)
self.raw = raw
common_kw = dict(keep_blank_values=True, strict_parsing=False)
if PY2:
# in python 2 parse_qs does its own unquote_plus'ing ...
as_dict = parse_qs(raw_str, **common_kw)
# ... but doesn't decode to unicode.
for k, vals in list(as_dict.items()):
as_dict[_decode(k, errors=errors)] = [
_decode(v, errors=errors) for v in vals
]
else:
# in python 3 parse_qs does the decoding
as_dict = parse_qs(raw_str, errors=errors, **common_kw)
Mapping.__init__(self, as_dict)
def update_query_parameters(url, query_parameters):
"""
Return url with updated query parameters.
Arguments:
url (str): Original url whose query parameters need to be updated.
query_parameters (dict): A dictionary containing query parameters to be added to course selection url.
Returns:
(slug): slug identifier for the identity provider that can be used for identity verification of
users associated the enterprise customer of the given user.
"""
scheme, netloc, path, query_string, fragment = urlsplit(url)
url_params = parse_qs(query_string)
# Update url query parameters
url_params.update(query_parameters)
return urlunsplit(
(scheme, netloc, path, urlencode(url_params, doseq=True), fragment),
)
def traverse_pagination(response, endpoint):
"""
Traverse a paginated API response.
Extracts and concatenates "results" (list of dict) returned by DRF-powered
APIs.
Arguments:
response (Dict): Current response dict from service API
endpoint (slumber Resource object): slumber Resource object from edx-rest-api-client
Returns:
list of dict.
"""
results = response.get('results', [])
next_page = response.get('next')
while next_page:
querystring = parse_qs(urlparse(next_page).query, keep_blank_values=True)
response = endpoint.get(**querystring)
results += response.get('results', [])
next_page = response.get('next')
return results
def get_msg(hinfo, binding):
if binding == BINDING_SOAP:
xmlstr = hinfo["data"]
elif binding == BINDING_HTTP_POST:
_inp = hinfo["data"][3]
i = _inp.find(TAG1)
i += len(TAG1) + 1
j = _inp.find('"', i)
xmlstr = _inp[i:j]
else: # BINDING_HTTP_REDIRECT
parts = urlparse(hinfo["headers"][0][1])
xmlstr = parse_qs(parts.query)["SAMLRequest"][0]
return xmlstr
# ------------------------------------------------------------------------
def get_msg(hinfo, binding, response=False):
if binding == BINDING_SOAP:
msg = hinfo["data"]
elif binding == BINDING_HTTP_POST:
_inp = hinfo["data"][3]
i = _inp.find(TAG1)
i += len(TAG1) + 1
j = _inp.find('"', i)
msg = _inp[i:j]
elif binding == BINDING_HTTP_ARTIFACT:
# either by POST or by redirect
if hinfo["data"]:
_inp = hinfo["data"][3]
i = _inp.find(TAG1)
i += len(TAG1) + 1
j = _inp.find('"', i)
msg = _inp[i:j]
else:
parts = urlparse(hinfo["url"])
msg = parse_qs(parts.query)["SAMLart"][0]
else: # BINDING_HTTP_REDIRECT
parts = urlparse(hinfo["headers"][0][1])
msg = parse_qs(parts.query)["SAMLRequest"][0]
return msg
def get_msg(hinfo, binding, response=False):
if binding == BINDING_SOAP:
msg = hinfo["data"]
elif binding == BINDING_HTTP_POST:
_inp = hinfo["data"][3]
i = _inp.find(TAG1)
i += len(TAG1) + 1
j = _inp.find('"', i)
msg = _inp[i:j]
elif binding == BINDING_URI:
if response:
msg = hinfo["data"]
else:
msg = ""
return parse_qs(hinfo["url"].split("?")[1])["ID"][0]
else: # BINDING_HTTP_REDIRECT
parts = urlparse(hinfo["headers"][0][1])
msg = parse_qs(parts.query)["SAMLRequest"][0]
return msg
def test_parse_faulty_request_to_err_status(self):
req_id, authn_request = self.client.create_authn_request(
destination="http://www.example.com")
binding = BINDING_HTTP_REDIRECT
htargs = self.client.apply_binding(binding, "%s" % authn_request,
"http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print(_dict)
try:
self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
status = None
except OtherError as oe:
print(oe.args)
status = s_utils.error_status_factory(oe)
assert status
print(status)
assert _eq(status.keyswv(), ["status_code", "status_message"])
assert status.status_message.text == 'Not destined for me!'
status_code = status.status_code
assert _eq(status_code.keyswv(), ["status_code", "value"])
assert status_code.value == samlp.STATUS_RESPONDER
assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
def test_parse_ok_request(self):
req_id, authn_request = self.client.create_authn_request(
message_id="id1", destination="http://localhost:8088/sso")
print(authn_request)
binding = BINDING_HTTP_REDIRECT
htargs = self.client.apply_binding(binding, "%s" % authn_request,
"http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print(_dict)
req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
# returns a dictionary
print(req)
resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST])
assert resp_args["destination"] == "http://lingon.catalogix.se:8087/"
assert resp_args["in_response_to"] == "id1"
name_id_policy = resp_args["name_id_policy"]
assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
assert resp_args[
"sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
def parse_discovery_service_response(url="", query="",
returnIDParam="entityID"):
"""
Deal with the response url from a Discovery Service
:param url: the url the user was redirected back to or
:param query: just the query part of the URL.
:param returnIDParam: This is where the identifier of the IdP is
place if it was specified in the query. Default is 'entityID'
:return: The IdP identifier or "" if none was given
"""
if url:
part = urlparse(url)
qsd = parse_qs(part[4])
elif query:
qsd = parse_qs(query)
else:
qsd = {}
try:
return qsd[returnIDParam][0]
except KeyError:
return ""
def disco(environ, start_response, _sp):
query = parse_qs(environ["QUERY_STRING"])
entity_id = query["entityID"][0]
_sid = query["sid"][0]
came_from = CACHE.outstanding_queries[_sid]
_sso = SSO(_sp, environ, start_response, cache=CACHE, **ARGS)
resp = _sso.redirect_to_auth(_sso.sp, entity_id, came_from)
# Add cookie
kaka = make_cookie("ve_disco", entity_id, "SEED_SAW")
resp.headers.append(kaka)
return resp(environ, start_response)
# ----------------------------------------------------------------------------
# noinspection PyUnusedLocal
def _post_servers(self, request):
# type: (requests.PreparedRequest) -> requests.Response
data = parse_qs(force_text(request.body))
self.server_id = self._public_id('srv')
self.auth_token = ''.join(random.choice(mixed_alphabet) for i in xrange(20))
self.api_version = force_text(request.headers['X-Cloak-API-Version'])
self.name = data['name'][0]
self.target_id = data['target'][0]
# Make sure these exist
data['email'][0]
data['password'][0]
result = {
'server_id': self.server_id,
'auth_token': self.auth_token,
'server': self._server_result(),
}
return self._response(request, 201, result)
def _post_server_csr(self, request):
# type: (requests.PreparedRequest) -> requests.Response
data = parse_qs(force_text(request.body))
if self._authenticate(request):
self.csr = data['csr'][0]
self.pki_tag = ''.join(random.choice(mixed_alphabet) for i in xrange(16))
response = self._response(request, 202)
else:
response = self._response(request, 401)
return response
#
# Utils
#
def forward_request(self, method, path, data, headers):
req_data = None
if method == 'POST' and path == '/':
req_data = urlparse.parse_qs(to_str(data))
action = req_data.get('Action')[0]
if req_data:
if action == 'CreateChangeSet':
return create_change_set(req_data)
elif action == 'DescribeChangeSet':
return describe_change_set(req_data)
elif action == 'ExecuteChangeSet':
return execute_change_set(req_data)
elif action == 'UpdateStack' and req_data.get('TemplateURL'):
# Temporary fix until the moto CF backend can handle TemplateURL (currently fails)
url = re.sub(r'https?://s3\.amazonaws\.com', aws_stack.get_local_service_url('s3'),
req_data.get('TemplateURL')[0])
req_data['TemplateBody'] = requests.get(url).content
modified_data = urlparse.urlencode(req_data, doseq=True)
return Request(data=modified_data, headers=headers, method=method)
elif action == 'ValidateTemplate':
return validate_template(req_data)
return True
def oauth_authorization_request(self, token):
"""
Generates the URL for the authorization link
"""
if not isinstance(token, dict):
token = parse_qs(token)
oauth_token = token.get(self.OAUTH_TOKEN_PARAMETER_NAME)[0]
state = self.get_or_create_state()
base_url = self.setting('MEDIAWIKI_URL')
return '{0}?{1}'.format(base_url, urlencode({
'title': 'Special:Oauth/authenticate',
self.OAUTH_TOKEN_PARAMETER_NAME: oauth_token,
self.REDIRECT_URI_PARAMETER_NAME: self.get_redirect_uri(state)
}))
def access_token(self, token):
"""
Fetches the Mediawiki access token.
"""
auth_token = self.oauth_auth(token)
response = requests.post(
url=self.setting('MEDIAWIKI_URL'),
params={'title': 'Special:Oauth/token'},
auth=auth_token
)
credentials = parse_qs(response.content)
oauth_token_key = credentials.get(b('oauth_token'))[0]
oauth_token_secret = credentials.get(b('oauth_token_secret'))[0]
oauth_token_key = oauth_token_key.decode()
oauth_token_secret = oauth_token_secret.decode()
return {
'oauth_token': oauth_token_key,
'oauth_token_secret': oauth_token_secret
}
def _process_url(page_from, page_to, url):
# Get url page
query = urlparse(url).query
query = parse_qs(query)
page = query.get('page')
# Preserve if match
if page:
page_from = int(page_from)
page_to = int(page_to)
page = int(page[0])
if page >= page_from and page <= page_to:
return url
return None
test_serversV21.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_get_servers_with_limit(self):
req = self.req('/fake/servers?limit=3')
res_dict = self.controller.index(req)
servers = res_dict['servers']
self.assertEqual([s['id'] for s in servers],
[fakes.get_fake_uuid(i) for i in range(len(servers))])
servers_links = res_dict['servers_links']
self.assertEqual(servers_links[0]['rel'], 'next')
href_parts = urlparse.urlparse(servers_links[0]['href'])
self.assertEqual('/v2/fake/servers', href_parts.path)
params = urlparse.parse_qs(href_parts.query)
expected_params = {'limit': ['3'],
'marker': [fakes.get_fake_uuid(2)]}
self.assertThat(params, matchers.DictMatches(expected_params))
test_serversV21.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_get_server_details_with_limit(self):
req = self.req('/fake/servers/detail?limit=3')
res = self.controller.detail(req)
servers = res['servers']
self.assertEqual([s['id'] for s in servers],
[fakes.get_fake_uuid(i) for i in range(len(servers))])
servers_links = res['servers_links']
self.assertEqual(servers_links[0]['rel'], 'next')
href_parts = urlparse.urlparse(servers_links[0]['href'])
self.assertEqual('/v2/fake/servers/detail', href_parts.path)
params = urlparse.parse_qs(href_parts.query)
expected = {'limit': ['3'], 'marker': [fakes.get_fake_uuid(2)]}
self.assertThat(params, matchers.DictMatches(expected))
test_serversV21.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_get_server_details_with_limit_and_other_params(self):
req = self.req('/fake/servers/detail'
'?limit=3&blah=2:t'
'&sort_key=id1&sort_dir=asc')
res = self.controller.detail(req)
servers = res['servers']
self.assertEqual([s['id'] for s in servers],
[fakes.get_fake_uuid(i) for i in range(len(servers))])
servers_links = res['servers_links']
self.assertEqual(servers_links[0]['rel'], 'next')
href_parts = urlparse.urlparse(servers_links[0]['href'])
self.assertEqual('/v2/fake/servers/detail', href_parts.path)
params = urlparse.parse_qs(href_parts.query)
expected = {'limit': ['3'], 'blah': ['2:t'],
'sort_key': ['id1'], 'sort_dir': ['asc'],
'marker': [fakes.get_fake_uuid(2)]}
self.assertThat(params, matchers.DictMatches(expected))
test_servers.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_get_servers_with_limit(self):
req = fakes.HTTPRequest.blank('/fake/servers?limit=3')
res_dict = self.controller.index(req)
servers = res_dict['servers']
self.assertEqual([fakes.get_fake_uuid(i) for i in range(len(servers))],
[s['id'] for s in servers])
servers_links = res_dict['servers_links']
self.assertEqual('next', servers_links[0]['rel'])
href_parts = urlparse.urlparse(servers_links[0]['href'])
self.assertEqual('/v2/fake/servers', href_parts.path)
params = urlparse.parse_qs(href_parts.query)
expected_params = {'limit': ['3'],
'marker': [fakes.get_fake_uuid(2)]}
self.assertThat(params, matchers.DictMatches(expected_params))