def __two_legged_request(self, parameters=None, method=None):
"""Sign a request for two-legged authentication"""
params = self.get_base_params()
if parameters:
params.update(parameters)
url = self.endpoint
yql_logger.debug("params: %s", params)
yql_logger.debug("endpoint_url: %s", url)
if not method:
method = "GET"
consumer = oauth.Consumer(self.api_key, self.secret)
request = oauth.Request(method=method, url=url, parameters=params)
sig = self.get_signature(url)
yql_logger.debug("signature: %s", sig)
request.sign_request(sig, consumer, None)
return request
python类Request()的实例源码
def get_token_and_auth_url(self, callback_url=None):
"""First step is to get the token and then send the request that
provides the auth URL
Returns a tuple of token and the authorisation URL.
"""
client = oauth.Client(self.consumer)
params = {}
params['oauth_callback'] = callback_url or 'oob'
request = oauth.Request(parameters=params)
url = REQUEST_TOKEN_URL
resp, content = client.request(url, "POST", request.to_postdata())
if resp.get('status') == '200':
token = oauth.Token.from_string(content)
yql_logger.debug("token: %s", token)
data = dict(parse_qsl(content))
yql_logger.debug("data: %s", data)
return token, data['xoauth_request_auth_url']
else:
raise YQLError(resp, content, url)
def __two_legged_request(self, resource_url, parameters=None, method=None):
"""Sign a request for two-legged authentication"""
params = self.get_base_params()
if parameters:
params.update(parameters)
yql_logger.debug("params: %s", params)
yql_logger.debug("resource_url: %s", resource_url)
if not method:
method = "GET"
consumer = oauth.Consumer(self.api_key, self.secret)
request = oauth.Request(method=method, url=resource_url,
parameters=params)
request.sign_request(self.hmac_sha1_signature, consumer, None)
return request
def get_token_and_auth_url(self, callback_url=None):
"""First step is to get the token and then send the request that
provides the auth URL
Returns a tuple of token and the authorisation URL.
"""
client = oauth.Client(self.consumer)
params = {}
params['oauth_callback'] = callback_url or 'oob'
request = oauth.Request(parameters=params)
url = REQUEST_TOKEN_URL
resp, content = client.request(url, "POST", request.to_postdata())
if resp.get('status') == '200':
token = oauth.Token.from_string(content)
yql_logger.debug("token: %s", token)
data = dict(parse_qsl(content))
yql_logger.debug("data: %s", data)
return token, data['xoauth_request_auth_url']
else:
raise YQLError, (resp, content, url)
def test_get_nonoauth_parameters(self):
oauth_params = {
'oauth_consumer': 'asdfasdfasdf'
}
other_params = {
u('foo'): u('baz'),
u('bar'): u('foo'),
u('multi'): [u('FOO'), u('BAR')],
u('uni_utf8'): u(b'\xae', 'latin1'),
u('uni_unicode'): _UGLYPH,
u('uni_unicode_2'):
u(b'\xc3\xa5\xc3\x85\xc3\xb8\xc3\x98', 'latin1'), # 'åÅøØ'
}
params = oauth_params
params.update(other_params)
req = oauth.Request("GET", "http://example.com", params)
self.assertEqual(other_params, req.get_nonoauth_parameters())
def test_to_postdata(self):
realm = "http://sp.example.com/"
params = {
'multi': ['FOO','BAR'],
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
req = oauth.Request("GET", realm, params)
flat = [('multi','FOO'),('multi','BAR')]
del params['multi']
flat.extend(params.items())
kf = lambda x: x[0]
self.assertEqual(
sorted(flat, key=kf),
sorted(parse_qsl(req.to_postdata()), key=kf))
def test_to_url(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
req = oauth.Request("GET", url, params)
exp = urlparse("%s?%s" % (url, urlencode(params)))
res = urlparse(req.to_url())
self.assertEqual(exp.scheme, res.scheme)
self.assertEqual(exp.netloc, res.netloc)
self.assertEqual(exp.path, res.path)
exp_parsed = parse_qs(exp.query)
res_parsed = parse_qs(res.query)
self.assertEqual(exp_parsed, res_parsed)
def test_get_normalized_parameters_from_url(self):
# example copied from
# https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
# which in turns says that it was copied from
# http://oauth.net/core/1.0/#sig_base_example .
url = ("http://photos.example.net/photos?file=vacation.jpg"
"&oauth_consumer_key=dpf43f3p2l4k3l03"
"&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1"
"&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk"
"&oauth_version=1.0&size=original")
req = oauth.Request("GET", url)
res = req.get_normalized_parameters()
expected = ('file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03'
'&oauth_nonce=kllo9940pd9333jh'
'&oauth_signature_method=HMAC-SHA1'
'&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk'
'&oauth_version=1.0&size=original')
self.assertEqual(expected, res)
def test_get_normalized_parameters_ignores_auth_signature(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_signature': "some-random-signature-%d" % random.randint(1000, 2000),
'oauth_token': "ad180jjd733klru7",
}
req = oauth.Request("GET", url, params)
res = req.get_normalized_parameters()
self.assertNotEqual(urlencode(sorted(params.items())), res)
foo = params.copy()
del foo["oauth_signature"]
self.assertEqual(urlencode(sorted(foo.items())), res)
def test_from_token_and_callback(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
req = oauth.Request.from_token_and_callback(tok)
self.assertFalse('oauth_callback' in req)
self.assertEqual(req['oauth_token'], tok.key)
req = oauth.Request.from_token_and_callback(tok, callback=url)
self.assertTrue('oauth_callback' in req)
self.assertEqual(req['oauth_callback'], url)
def setUp(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
self.consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
self.token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = self.token.key
params['oauth_consumer_key'] = self.consumer.key
self.request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
self.request.sign_request(signature_method, self.consumer, self.token)
def sign_request(self, url, url_params={}):
oauth_request = oauth2.Request(
method="GET",
url=url,
parameters=url_params
)
oauth_request.update(
{
'oauth_nonce': oauth2.generate_nonce(),
'oauth_timestamp': oauth2.generate_timestamp(),
'oauth_token': self.token.key,
'oauth_consumer_key': self.consumer.key
}
)
oauth_request.sign_request(
oauth2.SignatureMethod_HMAC_SHA1(),
self.consumer,
self.token
)
return oauth_request.to_url()
def test_get_nonoauth_parameters(self):
oauth_params = {
'oauth_consumer': 'asdfasdfasdf'
}
other_params = {
u('foo'): u('baz'),
u('bar'): u('foo'),
u('multi'): [u('FOO'), u('BAR')],
u('uni_utf8'): u(b'\xae', 'latin1'),
u('uni_unicode'): _UGLYPH,
u('uni_unicode_2'):
u(b'\xc3\xa5\xc3\x85\xc3\xb8\xc3\x98', 'latin1'), # 'åÅøØ'
}
params = oauth_params
params.update(other_params)
req = oauth.Request("GET", "http://example.com", params)
self.assertEqual(other_params, req.get_nonoauth_parameters())
def test_to_postdata(self):
realm = "http://sp.example.com/"
params = {
'multi': ['FOO','BAR'],
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
req = oauth.Request("GET", realm, params)
flat = [('multi','FOO'),('multi','BAR')]
del params['multi']
flat.extend(params.items())
kf = lambda x: x[0]
self.assertEqual(
sorted(flat, key=kf),
sorted(parse_qsl(req.to_postdata()), key=kf))
def test_to_url(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
req = oauth.Request("GET", url, params)
exp = urlparse("%s?%s" % (url, urlencode(params)))
res = urlparse(req.to_url())
self.assertEqual(exp.scheme, res.scheme)
self.assertEqual(exp.netloc, res.netloc)
self.assertEqual(exp.path, res.path)
exp_parsed = parse_qs(exp.query)
res_parsed = parse_qs(res.query)
self.assertEqual(exp_parsed, res_parsed)
def test_get_normalized_parameters_from_url(self):
# example copied from
# https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
# which in turns says that it was copied from
# http://oauth.net/core/1.0/#sig_base_example .
url = ("http://photos.example.net/photos?file=vacation.jpg"
"&oauth_consumer_key=dpf43f3p2l4k3l03"
"&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1"
"&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk"
"&oauth_version=1.0&size=original")
req = oauth.Request("GET", url)
res = req.get_normalized_parameters()
expected = ('file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03'
'&oauth_nonce=kllo9940pd9333jh'
'&oauth_signature_method=HMAC-SHA1'
'&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk'
'&oauth_version=1.0&size=original')
self.assertEqual(expected, res)
def test_get_normalized_parameters_ignores_auth_signature(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_signature': "some-random-signature-%d" % random.randint(1000, 2000),
'oauth_token': "ad180jjd733klru7",
}
req = oauth.Request("GET", url, params)
res = req.get_normalized_parameters()
self.assertNotEqual(urlencode(sorted(params.items())), res)
foo = params.copy()
del foo["oauth_signature"]
self.assertEqual(urlencode(sorted(foo.items())), res)
def test_from_token_and_callback(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
req = oauth.Request.from_token_and_callback(tok)
self.assertFalse('oauth_callback' in req)
self.assertEqual(req['oauth_token'], tok.key)
req = oauth.Request.from_token_and_callback(tok, callback=url)
self.assertTrue('oauth_callback' in req)
self.assertEqual(req['oauth_callback'], url)
def setUp(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
self.consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
self.token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = self.token.key
params['oauth_consumer_key'] = self.consumer.key
self.request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
self.request.sign_request(signature_method, self.consumer, self.token)
def refresh_token(self, token):
"""Access Tokens only last for one hour from the point of being issued.
When a token has expired it needs to be refreshed this method takes an
expired token and refreshes it.
token parameter can be either a token object or a token string.
"""
if not hasattr(token, "key"):
token = YahooToken.from_string(token)
params = self.get_base_params()
params['oauth_token'] = token.key
params['oauth_token_secret'] = token.secret
params['oauth_session_handle'] = token.session_handle
oauth_request = oauth.Request.from_consumer_and_token(
self.consumer, token=token,
http_url=ACCESS_TOKEN_URL,
http_method="POST",
parameters=params)
yql_logger.debug("oauth_request: %s", oauth_request)
oauth_request.sign_request(
self.hmac_sha1_signature, self.consumer, token)
url = oauth_request.to_url()
yql_logger.debug("oauth_url: %s", url)
postdata = oauth_request.to_postdata()
yql_logger.debug("oauth_postdata: %s", postdata)
resp, content = self.http.request(url, "POST", postdata)
if resp.get('status') == '200':
access_token = YahooToken.from_string(content)
yql_logger.debug("oauth_access_token: %s", access_token)
access_token.timestamp = oauth_request['oauth_timestamp']
return access_token
else:
raise YQLError(resp, content, url)
def get_uri(self, query, params=None, **kwargs):
"""Get the the request url"""
if isinstance(query, basestring):
query = YQLQuery(query)
query_params = self.get_query_params(query, params, **kwargs)
token = kwargs.get("token")
if hasattr(token, "yahoo_guid"):
query_params["oauth_yahoo_guid"] = getattr(token, "yahoo_guid")
if not token:
raise ValueError("Without a token three-legged-auth cannot be"
" carried out")
yql_logger.debug("query_params: %s", query_params)
http_method = query.get_http_method()
url = self.endpoint
oauth_request = oauth.Request.from_consumer_and_token(
self.consumer, http_url=url,
token=token, parameters=query_params,
http_method=http_method)
yql_logger.debug("oauth_request: %s", oauth_request)
# Sign request
sig = self.get_signature(url)
oauth_request.sign_request(sig, self.consumer, token)
yql_logger.debug("oauth_signed_request: %s", oauth_request)
url = oauth_request.to_url()
url = clean_url(url)
return url.replace('+', '%20').replace('%7E', '~')
def refresh_token(self, token):
"""Access Tokens only last for one hour from the point of being issued.
When a token has expired it needs to be refreshed this method takes an
expired token and refreshes it.
token parameter can be either a token object or a token string.
"""
if not hasattr(token, "key"):
token = YahooToken.from_string(token)
params = self.get_base_params()
params['oauth_token'] = token.key
params['oauth_token_secret'] = token.secret
params['oauth_session_handle'] = token.session_handle
oauth_request = oauth.Request.from_consumer_and_token(
self.consumer, token=token,
http_url=ACCESS_TOKEN_URL,
http_method="POST",
parameters=params)
yql_logger.debug("oauth_request: %s", oauth_request)
oauth_request.sign_request(
self.hmac_sha1_signature, self.consumer, token)
url = oauth_request.to_url()
yql_logger.debug("oauth_url: %s", url)
postdata = oauth_request.to_postdata()
yql_logger.debug("oauth_postdata: %s", postdata)
resp, content = self.http.request(url, "POST", postdata)
if resp.get('status') == '200':
access_token = YahooToken.from_string(content)
yql_logger.debug("oauth_access_token: %s", access_token)
access_token.timestamp = oauth_request['oauth_timestamp']
return access_token
else:
raise YQLError, (resp, content, url)
def get_uri(self, query, params=None, **kwargs):
"""Get the the request url"""
query_params = self.get_query_params(query, params, **kwargs)
token = kwargs.get("token")
if hasattr(token, "yahoo_guid"):
query_params["oauth_yahoo_guid"] = getattr(token, "yahoo_guid")
if not token:
raise ValueError, "Without a token three-legged-auth cannot be"\
" carried out"
yql_logger.debug("query_params: %s", query_params)
http_method = get_http_method(query)
oauth_request = oauth.Request.from_consumer_and_token(
self.consumer, http_url=self.uri,
token=token, parameters=query_params,
http_method=http_method)
yql_logger.debug("oauth_request: %s", oauth_request)
# Sign request
oauth_request.sign_request(
self.hmac_sha1_signature, self.consumer, token)
yql_logger.debug("oauth_signed_request: %s", oauth_request)
uri = "%s?%s" % (self.uri, oauth_request.to_postdata())
return uri.replace('+', '%20').replace('%7E', '~')
def test__init__(self):
method = "GET"
req = oauth.Request(method)
self.assertFalse('url' in req.__dict__)
self.assertFalse('normalized_url' in req.__dict__)
self.assertRaises(AttributeError, getattr, req, 'url')
self.assertRaises(AttributeError, getattr, req, 'normalized_url')
def test_setter(self):
url = "http://example.com"
method = "GET"
req = oauth.Request(method, url)
self.assertEqual(req.url, url)
self.assertEqual(req.normalized_url, url)
req.url = url + '/?foo=bar'
self.assertEqual(req.url, url + '/?foo=bar')
self.assertEqual(req.normalized_url, url + '/')
req.url = None
self.assertEqual(req.url, None)
self.assertEqual(req.normalized_url, None)
def test_deleter(self):
url = "http://example.com"
method = "GET"
req = oauth.Request(method, url)
del req.url
self.assertRaises(AttributeError, getattr, req, 'url')
def test_url(self):
url1 = "http://example.com:80/foo.php"
url2 = "https://example.com:443/foo.php"
exp1 = "http://example.com/foo.php"
exp2 = "https://example.com/foo.php"
method = "GET"
req = oauth.Request(method, url1)
self.assertEqual(req.normalized_url, exp1)
self.assertEqual(req.url, url1)
req = oauth.Request(method, url2)
self.assertEqual(req.normalized_url, exp2)
self.assertEqual(req.url, url2)
def test_bad_url(self):
request = oauth.Request()
try:
request.url = "ftp://example.com"
self.fail("Invalid URL scheme was accepted.")
except ValueError:
pass
def test_no_url_set(self):
consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
token = oauth.Token('my_key', 'my_secret')
request = oauth.Request()
self.assertRaises(ValueError,
request.sign_request,
oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
def test_url_query(self):
url = ("https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10")
normalized_url = urlunparse(urlparse(url)[:3] + (None, None, None))
method = "GET"
req = oauth.Request(method, url)
self.assertEqual(req.url, url)
self.assertEqual(req.normalized_url, normalized_url)