def do_GET(s):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
s.send_response(200)
s.send_header("Content-type", "text/html")
s.end_headers()
query = s.path.split('?', 1)[-1]
query = dict(parse_qsl(query))
s.server.query_params = query
s.wfile.write("<html><head><title>Authentication Status</title></head>")
s.wfile.write("<body><p>The authentication flow has completed.</p>")
s.wfile.write("</body></html>")
python类parse_qsl()的实例源码
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def handleRequest(self, env, start_response):
path = env["PATH_INFO"]
if env.get("QUERY_STRING"):
get = dict(cgi.parse_qsl(env['QUERY_STRING']))
else:
get = {}
ui_request = UiRequest(self, get, env, start_response)
if config.debug: # Let the exception catched by werkezung
return ui_request.route(path)
else: # Catch and display the error
try:
return ui_request.route(path)
except Exception, err:
logging.debug("UiRequest error: %s" % Debug.formatException(err))
return ui_request.error500("Err: %s" % Debug.formatException(err))
# Reload the UiRequest class to prevent restarts in debug mode
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def do_GET(s):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
s.send_response(200)
s.send_header("Content-type", "text/html")
s.end_headers()
query = s.path.split('?', 1)[-1]
query = dict(parse_qsl(query))
s.server.query_params = query
s.wfile.write("<html><head><title>Authentication Status</title></head>")
s.wfile.write("<body><p>The authentication flow has completed.</p>")
s.wfile.write("</body></html>")
def do_GET(s):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
s.send_response(200)
s.send_header("Content-type", "text/html")
s.end_headers()
query = s.path.split('?', 1)[-1]
query = dict(parse_qsl(query))
s.server.query_params = query
s.wfile.write("<html><head><title>Authentication Status</title></head>")
s.wfile.write("<body><p>The authentication flow has completed.</p>")
s.wfile.write("</body></html>")
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def do_GET(s):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
s.send_response(200)
s.send_header("Content-type", "text/html")
s.end_headers()
query = s.path.split('?', 1)[-1]
query = dict(parse_qsl(query))
s.server.query_params = query
s.wfile.write("<html><head><title>Authentication Status</title></head>")
s.wfile.write("<body><p>The authentication flow has completed.</p>")
s.wfile.write("</body></html>")
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def parse_backend_uri(backend_uri):
"""
Converts the "backend_uri" into a cache scheme ('db', 'memcached', etc), a
host and any extra params that are required for the backend. Returns a
(scheme, host, params) tuple.
"""
if backend_uri.find(':') == -1:
raise InvalidCacheBackendError("Backend URI must start with scheme://")
scheme, rest = backend_uri.split(':', 1)
if not rest.startswith('//'):
raise InvalidCacheBackendError("Backend URI must start with scheme://")
host = rest[2:]
qpos = rest.find('?')
if qpos != -1:
params = dict(parse_qsl(rest[qpos+1:]))
host = rest[2:qpos]
else:
params = {}
if host.endswith('/'):
host = host[:-1]
return scheme, host, params
def do_GET(s):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
s.send_response(200)
s.send_header("Content-type", "text/html")
s.end_headers()
query = s.path.split('?', 1)[-1]
query = dict(parse_qsl(query))
s.server.query_params = query
s.wfile.write("<html><head><title>Authentication Status</title></head>")
s.wfile.write("<body><p>The authentication flow has completed.</p>")
s.wfile.write("</body></html>")
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def _update_query_params(uri, params):
"""Updates a URI with new query parameters.
Args:
uri: string, A valid URI, with potential existing query parameters.
params: dict, A dictionary of query parameters.
Returns:
The same URI but with the new query parameters added.
"""
parts = list(urlparse.urlparse(uri))
query_params = dict(parse_qsl(parts[4])) # 4 is the index of the query part
query_params.update(params)
parts[4] = urllib.urlencode(query_params)
return urlparse.urlunparse(parts)
def _parse_exchange_token_response(content):
"""Parses response of an exchange token request.
Most providers return JSON but some (e.g. Facebook) return a
url-encoded string.
Args:
content: The body of a response
Returns:
Content as a dictionary object. Note that the dict could be empty,
i.e. {}. That basically indicates a failure.
"""
resp = {}
try:
resp = simplejson.loads(content)
except StandardError:
# different JSON libs raise different exceptions,
# so we just do a catch-all here
resp = dict(parse_qsl(content))
# some providers respond with 'expires', others with 'expires_in'
if resp and 'expires' in resp:
resp['expires_in'] = resp.pop('expires')
return resp
def postReformat(self, postdata):
return urllib.urlencode(cgi.parse_qsl(postdata))
def querystringReformat(self, qsdata):
temp = qsdata.split("?")
if len(temp) == 2:
return temp[0] + "?" + urllib.urlencode(cgi.parse_qsl(temp[1]))
else:
return qsdata
test_yahoo_token.py 文件源码
项目:yahoo-fantasy-football-metrics
作者: uberfastman
项目源码
文件源码
阅读 40
收藏 0
点赞 0
评论 0
def test_y_token_to_string(self):
token = yql.YahooToken('test-key', 'test-secret')
token_to_string = token.to_string()
string_data = dict(parse_qsl(token_to_string))
self.assertEqual(string_data.get('oauth_token'), 'test-key')
self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
test_yahoo_token.py 文件源码
项目:yahoo-fantasy-football-metrics
作者: uberfastman
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_y_token_to_string2(self):
token = yql.YahooToken('test-key', 'test-secret')
token.timestamp = '1111'
token.session_handle = 'poop'
token.callback_confirmed = 'basilfawlty'
token_to_string = token.to_string()
string_data = dict(parse_qsl(token_to_string))
self.assertEqual(string_data.get('oauth_token'), 'test-key')
self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
self.assertEqual(string_data.get('token_creation_timestamp'), '1111')
self.assertEqual(string_data.get('oauth_callback_confirmed'), 'basilfawlty')
self.assertEqual(string_data.get('oauth_session_handle'), 'poop')
test_requests_responses.py 文件源码
项目:yahoo-fantasy-football-metrics
作者: uberfastman
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_request_for_two_legged(self):
query = 'SELECT * from foo'
y = TestTwoLegged('test-api-key', 'test-secret', httplib2_inst=httplib2.Http())
signed_url = y.execute(query)
qs = dict(parse_qsl(signed_url.split('?')[1]))
self.assertEqual(qs['q'], query)
self.assertEqual(qs['format'], 'json')
test_requests_responses.py 文件源码
项目:yahoo-fantasy-football-metrics
作者: uberfastman
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_request_for_three_legged(self):
query = 'SELECT * from foo'
y = TestThreeLegged('test-api-key', 'test-secret',
httplib2_inst=httplib2.Http())
token = oauth.Token.from_string(
'oauth_token=foo&oauth_token_secret=bar')
signed_url = y.execute(query, token=token)
qs = dict(parse_qsl(signed_url.split('?')[1]))
self.assertEqual(qs['q'], query)
self.assertEqual(qs['format'], 'json')
def query_params(self):
"""The query parameters of the uri used to call the YQL API"""
if self.uri:
q_string = urlparse(self.uri)[4]
return dict(parse_qsl(q_string))
else:
return {}
def getPosted(self):
if self.env['REQUEST_METHOD'] == "POST":
return dict(cgi.parse_qsl(
self.env['wsgi.input'].readline().decode()
))
else:
return {}
# Return: <dict> Cookies based on self.env
def getCookies(self):
raw_cookies = self.env.get('HTTP_COOKIE')
if raw_cookies:
cookies = cgi.parse_qsl(raw_cookies)
return {key.strip(): val for key, val in cookies}
else:
return {}
def _parse_exchange_token_response(content):
"""Parses response of an exchange token request.
Most providers return JSON but some (e.g. Facebook) return a
url-encoded string.
Args:
content: The body of a response
Returns:
Content as a dictionary object. Note that the dict could be empty,
i.e. {}. That basically indicates a failure.
"""
resp = {}
try:
resp = simplejson.loads(content)
except StandardError:
# different JSON libs raise different exceptions,
# so we just do a catch-all here
resp = dict(parse_qsl(content))
# some providers respond with 'expires', others with 'expires_in'
if resp and 'expires' in resp:
resp['expires_in'] = resp.pop('expires')
return resp
def step1_get_authorize_url(self, redirect_uri=None):
"""Returns a URI to redirect to the provider.
Args:
redirect_uri: string, Either the string 'urn:ietf:wg:oauth:2.0:oob' for
a non-web-based application, or a URI that handles the callback from
the authorization server. This parameter is deprecated, please move to
passing the redirect_uri in via the constructor.
Returns:
A URI as a string to redirect the user to begin the authorization flow.
"""
if redirect_uri is not None:
logger.warning(('The redirect_uri parameter for'
'OAuth2WebServerFlow.step1_get_authorize_url is deprecated. Please'
'move to passing the redirect_uri in via the constructor.'))
self.redirect_uri = redirect_uri
if self.redirect_uri is None:
raise ValueError('The value of redirect_uri must not be None.')
query = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'scope': self.scope,
}
query.update(self.params)
parts = list(urlparse.urlparse(self.auth_uri))
query.update(dict(parse_qsl(parts[4]))) # 4 is the index of the query part
parts[4] = urllib.urlencode(query)
return urlparse.urlunparse(parts)
def test_y_token_to_string(self):
token = yql.YahooToken('test-key', 'test-secret')
token_to_string = token.to_string()
string_data = dict(parse_qsl(token_to_string))
self.assertEqual(string_data.get('oauth_token'), 'test-key')
self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
def test_y_token_to_string2(self):
token = yql.YahooToken('test-key', 'test-secret')
token.timestamp = '1111'
token.session_handle = 'poop'
token.callback_confirmed = 'basilfawlty'
token_to_string = token.to_string()
string_data = dict(parse_qsl(token_to_string))
self.assertEqual(string_data.get('oauth_token'), 'test-key')
self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
self.assertEqual(string_data.get('token_creation_timestamp'), '1111')
self.assertEqual(string_data.get('oauth_callback_confirmed'), 'basilfawlty')
self.assertEqual(string_data.get('oauth_session_handle'), 'poop')
def test_request_for_two_legged(self):
query = 'SELECT * from foo'
y = TestTwoLegged('test-api-key', 'test-secret', httplib2_inst=httplib2.Http())
signed_url = y.execute(query)
qs = dict(parse_qsl(signed_url.split('?')[1]))
self.assertEqual(qs['q'], query)
self.assertEqual(qs['format'], 'json')
def test_request_for_three_legged(self):
query = 'SELECT * from foo'
y = TestThreeLegged('test-api-key', 'test-secret',
httplib2_inst=httplib2.Http())
token = oauth.Token.from_string(
'oauth_token=foo&oauth_token_secret=bar')
signed_url = y.execute(query, token=token)
qs = dict(parse_qsl(signed_url.split('?')[1]))
self.assertEqual(qs['q'], query)
self.assertEqual(qs['format'], 'json')
def query_params(self):
"""The query parameters of the uri used to call the YQL API"""
if self.uri:
q_string = urlparse(self.uri)[4]
return dict(parse_qsl(q_string))
else:
return {}