def extract_params(self):
"""Returns the parameters for this task.
Returns:
A dictionary of strings mapping parameter names to their values as
strings. If the same name parameter has several values then the value will
be a list of strings. For POST and PULL requests then the parameters are
extracted from the task payload. For all other methods, the parameters are
extracted from the url query string. An empty dictionary is returned if
the task contains an empty payload or query string.
Raises:
ValueError: if the payload does not contain valid
'application/x-www-form-urlencoded' data (for POST and PULL) or the url
does not contain a valid query (all other methods).
"""
if self.__method in ('PULL', 'POST'):
query = self.__payload
else:
query = urlparse.urlparse(self.__relative_url).query
p = {}
if not query:
return p
for key, value in cgi.parse_qsl(
query, keep_blank_values=True, strict_parsing=True):
p.setdefault(key, []).append(value)
for key, value in p.items():
if len(value) == 1:
p[key] = value[0]
return p
python类parse_qsl()的实例源码
def extract_params(self):
"""Returns the parameters for this task.
Returns:
A dictionary of strings mapping parameter names to their values as
strings. If the same name parameter has several values then the value will
be a list of strings. For POST and PULL requests then the parameters are
extracted from the task payload. For all other methods, the parameters are
extracted from the url query string. An empty dictionary is returned if
the task contains an empty payload or query string.
Raises:
ValueError: if the payload does not contain valid
'application/x-www-form-urlencoded' data (for POST and PULL) or the url
does not contain a valid query (all other methods).
"""
if self.__method in ('PULL', 'POST'):
query = self.__payload
else:
query = urlparse.urlparse(self.__relative_url).query
p = {}
if not query:
return p
for key, value in cgi.parse_qsl(
query, keep_blank_values=True, strict_parsing=True):
p.setdefault(key, []).append(value)
for key, value in p.items():
if len(value) == 1:
p[key] = value[0]
return p
def _verifyReturnToArgs(query):
"""Verify that the arguments in the return_to URL are present in this
response.
"""
message = Message.fromPostArgs(query)
return_to = message.getArg(OPENID_NS, 'return_to')
if return_to is None:
raise ProtocolError('Response has no return_to')
parsed_url = urlparse(return_to)
rt_query = parsed_url[4]
parsed_args = cgi.parse_qsl(rt_query)
for rt_key, rt_value in parsed_args:
try:
value = query[rt_key]
if rt_value != value:
format = ("parameter %s value %r does not match "
"return_to's value %r")
raise ProtocolError(format % (rt_key, value, rt_value))
except KeyError:
format = "return_to parameter %s absent from query %r"
raise ProtocolError(format % (rt_key, query))
# Make sure all non-OpenID arguments in the response are also
# in the signed return_to.
bare_args = message.getArgs(BARE_NS)
for pair in bare_args.iteritems():
if pair not in parsed_args:
raise ProtocolError("Parameter %s not in return_to URL" % (pair[0],))
def test_deprecated_parse_qsl(self):
# this func is moved to urllib.parse, this is just a sanity check
with check_warnings(('cgi.parse_qsl is deprecated, use urllib.parse.'
'parse_qsl instead', DeprecationWarning)):
self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
cgi.parse_qsl('a=A1&b=B2&B=B3'))
def test_deprecated_parse_qsl(self):
# this func is moved to urlparse, this is just a sanity check
with check_warnings(('cgi.parse_qsl is deprecated, use urlparse.'
'parse_qsl instead', PendingDeprecationWarning)):
self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
cgi.parse_qsl('a=A1&b=B2&B=B3'))
def test_deprecated_parse_qsl(self):
# this func is moved to urlparse, this is just a sanity check
with check_warnings(('cgi.parse_qsl is deprecated, use urlparse.'
'parse_qsl instead', PendingDeprecationWarning)):
self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
cgi.parse_qsl('a=A1&b=B2&B=B3'))
def test_deprecated_parse_qsl(self):
# this func is moved to urllib.parse, this is just a sanity check
with check_warnings(('cgi.parse_qsl is deprecated, use urllib.parse.'
'parse_qsl instead', DeprecationWarning)):
self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
cgi.parse_qsl('a=A1&b=B2&B=B3'))
def extract_params(self):
"""Returns the parameters for this task.
Returns:
A dictionary of strings mapping parameter names to their values as
strings. If the same name parameter has several values then the value will
be a list of strings. For POST and PULL requests then the parameters are
extracted from the task payload. For all other methods, the parameters are
extracted from the url query string. An empty dictionary is returned if
the task contains an empty payload or query string.
Raises:
ValueError: if the payload does not contain valid
'application/x-www-form-urlencoded' data (for POST and PULL) or the url
does not contain a valid query (all other methods).
"""
if self.__method in ('PULL', 'POST'):
query = self.__payload
else:
query = urlparse.urlparse(self.__relative_url).query
p = {}
if not query:
return p
for key, value in cgi.parse_qsl(
query, keep_blank_values=True, strict_parsing=True):
p.setdefault(key, []).append(value)
for key, value in p.items():
if len(value) == 1:
p[key] = value[0]
return p
def request(self, uri, method="GET", body=None, headers=None,
redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None,
force_auth_header=False):
if not isinstance(headers, dict):
headers = {}
if body and method == "POST":
parameters = dict(parse_qsl(body))
elif method == "GET":
parsed = urlparse.urlparse(uri)
parameters = parse_qs(parsed.query)
else:
parameters = None
req = Request.from_consumer_and_token(self.consumer, token=self.token,
http_method=method, http_url=uri, parameters=parameters)
req.sign_request(self.method, self.consumer, self.token)
if force_auth_header:
# ensure we always send Authorization
headers.update(req.to_header())
if method == "POST":
if not force_auth_header:
body = req.to_postdata()
else:
body = req.encode_postdata(req.get_nonoauth_parameters())
headers['Content-Type'] = 'application/x-www-form-urlencoded'
elif method == "GET":
if not force_auth_header:
uri = req.to_url()
else:
if not force_auth_header:
# don't call update twice.
headers.update(req.to_header())
return httplib2.Http.request(self, uri, method=method, body=body,
headers=headers, redirections=redirections,
connection_type=connection_type)
def generate_authorize_url(self, redirect_uri='urn:ietf:wg:oauth:2.0:oob',
response_type='code', access_type='offline',
approval_prompt='auto', **kwargs):
"""Returns a URI to redirect to the provider.
Args:
redirect_uri: Either the string 'urn:ietf:wg:oauth:2.0:oob' for a
non-web-based application, or a URI that handles the
callback from the authorization server.
response_type: Either the string 'code' for server-side or native
application, or the string 'token' for client-side
application.
access_type: Either the string 'offline' to request a refresh token or
'online'.
approval_prompt: Either the string 'auto' to let the OAuth mechanism
determine if the approval page is needed or 'force'
if the approval prompt is desired in all cases.
If redirect_uri is 'urn:ietf:wg:oauth:2.0:oob' then pass in the
generated verification code to get_access_token,
otherwise pass in the query parameters received
at the callback uri to get_access_token.
If the response_type is 'token', no need to call
get_access_token as the API will return it within
the query parameters received at the callback:
oauth2_token.access_token = YOUR_ACCESS_TOKEN
"""
self.redirect_uri = redirect_uri
query = {
'response_type': response_type,
'client_id': self.client_id,
'redirect_uri': redirect_uri,
'scope': self.scope,
'approval_prompt': approval_prompt,
'access_type': access_type
}
query.update(kwargs)
parts = list(urllib.parse.urlparse(self.auth_uri))
query.update(dict(parse_qsl(parts[4]))) # 4 is the index of the query part
parts[4] = urllib.parse.urlencode(query)
return urllib.parse.urlunparse(parts)
def test_deprecated_parse_qsl(self):
# this func is moved to urlparse, this is just a sanity check
with check_warnings(('cgi.parse_qsl is deprecated, use urlparse.'
'parse_qsl instead', PendingDeprecationWarning)):
self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
cgi.parse_qsl('a=A1&b=B2&B=B3'))
def test_deprecated_parse_qsl(self):
# this func is moved to urllib.parse, this is just a sanity check
with check_warnings(('cgi.parse_qsl is deprecated, use urllib.parse.'
'parse_qsl instead', DeprecationWarning)):
self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
cgi.parse_qsl('a=A1&b=B2&B=B3'))
def test_deprecated_parse_qsl(self):
# this func is moved to urlparse, this is just a sanity check
with check_warnings(('cgi.parse_qsl is deprecated, use urlparse.'
'parse_qsl instead', PendingDeprecationWarning)):
self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
cgi.parse_qsl('a=A1&b=B2&B=B3'))
def _update_query_params(uri, params):
"""Updates a URI with new query parameters.
Args:
uri: string, A valid URI, with potential existing query parameters.
params: dict, A dictionary of query parameters.
Returns:
The same URI but with the new query parameters added.
"""
parts = list(urlparse.urlparse(uri))
query_params = dict(parse_qsl(parts[4])) # 4 is the index of the query part
query_params.update(params)
parts[4] = urllib.urlencode(query_params)
return urlparse.urlunparse(parts)
def _parse_exchange_token_response(content):
"""Parses response of an exchange token request.
Most providers return JSON but some (e.g. Facebook) return a
url-encoded string.
Args:
content: The body of a response
Returns:
Content as a dictionary object. Note that the dict could be empty,
i.e. {}. That basically indicates a failure.
"""
resp = {}
try:
resp = simplejson.loads(content)
except StandardError:
# different JSON libs raise different exceptions,
# so we just do a catch-all here
resp = dict(parse_qsl(content))
# some providers respond with 'expires', others with 'expires_in'
if resp and 'expires' in resp:
resp['expires_in'] = resp.pop('expires')
return resp
def _update_query_params(uri, params):
"""Updates a URI with new query parameters.
Args:
uri: string, A valid URI, with potential existing query parameters.
params: dict, A dictionary of query parameters.
Returns:
The same URI but with the new query parameters added.
"""
parts = list(urlparse.urlparse(uri))
query_params = dict(parse_qsl(parts[4])) # 4 is the index of the query part
query_params.update(params)
parts[4] = urllib.urlencode(query_params)
return urlparse.urlunparse(parts)
def _parse_exchange_token_response(content):
"""Parses response of an exchange token request.
Most providers return JSON but some (e.g. Facebook) return a
url-encoded string.
Args:
content: The body of a response
Returns:
Content as a dictionary object. Note that the dict could be empty,
i.e. {}. That basically indicates a failure.
"""
resp = {}
try:
resp = simplejson.loads(content)
except StandardError:
# different JSON libs raise different exceptions,
# so we just do a catch-all here
resp = dict(parse_qsl(content))
# some providers respond with 'expires', others with 'expires_in'
if resp and 'expires' in resp:
resp['expires_in'] = resp.pop('expires')
return resp
def extract_params(self):
"""Returns the parameters for this task.
Returns:
A dictionary of strings mapping parameter names to their values as
strings. If the same name parameter has several values then the value will
be a list of strings. For POST and PULL requests then the parameters are
extracted from the task payload. For all other methods, the parameters are
extracted from the url query string. An empty dictionary is returned if
the task contains an empty payload or query string.
Raises:
ValueError: if the payload does not contain valid
'application/x-www-form-urlencoded' data (for POST and PULL) or the url
does not contain a valid query (all other methods).
"""
if self.__method in ('PULL', 'POST'):
query = self.__payload
else:
query = urlparse.urlparse(self.__relative_url).query
p = {}
if not query:
return p
for key, value in cgi.parse_qsl(
query, keep_blank_values=True, strict_parsing=True):
p.setdefault(key, []).append(value)
for key, value in p.items():
if len(value) == 1:
p[key] = value[0]
return p
def test_deprecated_parse_qsl(self):
# this func is moved to urllib.parse, this is just a sanity check
with check_warnings(('cgi.parse_qsl is deprecated, use urllib.parse.'
'parse_qsl instead', DeprecationWarning)):
self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
cgi.parse_qsl('a=A1&b=B2&B=B3'))
def _verifyReturnToArgs(query):
"""Verify that the arguments in the return_to URL are present in this
response.
"""
message = Message.fromPostArgs(query)
return_to = message.getArg(OPENID_NS, 'return_to')
if return_to is None:
raise ProtocolError('Response has no return_to')
parsed_url = urlparse(return_to)
rt_query = parsed_url[4]
parsed_args = cgi.parse_qsl(rt_query)
for rt_key, rt_value in parsed_args:
try:
value = query[rt_key]
if rt_value != value:
format = ("parameter %s value %r does not match "
"return_to's value %r")
raise ProtocolError(format % (rt_key, value, rt_value))
except KeyError:
format = "return_to parameter %s absent from query %r"
raise ProtocolError(format % (rt_key, query))
# Make sure all non-OpenID arguments in the response are also
# in the signed return_to.
bare_args = message.getArgs(BARE_NS)
for pair in bare_args.iteritems():
if pair not in parsed_args:
raise ProtocolError("Parameter %s not in return_to URL" % (pair[0],))