def test_get_parameter(self):
url = "http://example.com"
method = "GET"
params = {'oauth_consumer' : 'asdf'}
req = oauth.Request(method, url, parameters=params)
self.assertEqual(req.get_parameter('oauth_consumer'), 'asdf')
self.assertRaises(oauth.Error, req.get_parameter, 'blah')
python类Request()的实例源码
def test_to_url_nonascii(self):
url = "http://sp.example.com/"
params = {
'nonasciithing': u'q\xbfu\xe9 ,aasp u?..a.s',
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
req = oauth.Request("GET", url, params)
res = urlparse(req.to_url())
params['nonasciithing'] = params['nonasciithing'].encode('utf-8')
exp = urlparse("%s?%s" % (url, urlencode(params)))
self.assertEquals(exp.netloc, res.netloc)
self.assertEquals(exp.path, res.path)
a = parse_qs(exp.query)
b = parse_qs(res.query)
self.assertEquals(a, b)
def test_to_header(self):
realm = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
req = oauth.Request("GET", realm, params)
header, value = list(req.to_header(realm).items())[0]
parts = value.split('OAuth ')
vars = parts[1].split(', ')
self.assertTrue(len(vars), (len(params) + 1))
res = {}
for v in vars:
var, val = v.split('=')
res[var] = unquote(val.strip('"'))
self.assertEqual(realm, res['realm'])
del res['realm']
self.assertTrue(len(res), len(params))
for key, val in res.items():
self.assertEqual(val, params.get(key))
def test_to_postdata_nonascii(self):
realm = "http://sp.example.com/"
params = {
'nonasciithing': u('q\xbfu\xe9 ,aasp u?..a.s', 'latin1'),
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
req = oauth.Request("GET", realm, params)
self.assertReallyEqual(
req.to_postdata(),
('nonasciithing=q%C2%BFu%C3%A9%20%2Caasp%20u%3F..a.s'
'&oauth_consumer_key=0685bd9184jfhq22'
'&oauth_nonce=4572616e48616d6d65724c61686176'
'&oauth_signature=wOJIO9A2W5mFwDgiDvZbTSMK%252FPY%253D'
'&oauth_signature_method=HMAC-SHA1'
'&oauth_timestamp=137131200'
'&oauth_token=ad180jjd733klru7'
'&oauth_version=1.0'
))
def test_to_url_with_query(self):
url = ("https://www.google.com/m8/feeds/contacts/default/full/"
"?alt=json&max-contacts=10")
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
req = oauth.Request("GET", url, params)
# Note: the url above already has query parameters, so append new
# ones with &
exp = urlparse("%s&%s" % (url, urlencode(params)))
res = urlparse(req.to_url())
self.assertEqual(exp.scheme, res.scheme)
self.assertEqual(exp.netloc, res.netloc)
self.assertEqual(exp.path, res.path)
exp_q = parse_qs(exp.query)
res_q = parse_qs(res.query)
self.assertTrue('alt' in res_q)
self.assertTrue('max-contacts' in res_q)
self.assertEqual(res_q['alt'], ['json'])
self.assertEqual(res_q['max-contacts'], ['10'])
self.assertEqual(exp_q, res_q)
def test_signature_base_string_bytes_nonascii_nonutf8(self):
consumer = oauth.Consumer('consumer_token', 'consumer_secret')
url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
b'?q=monkeys&category=animal'
b'&address=41+Decatur+St,+San+Francisc') + _B2766 + b',+CA'
req = oauth.Request("GET", url)
self.assertReallyEqual(
req.normalized_url,
u('http://api.simplegeo.com/1.0/places/address.json'))
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
self.assertReallyEqual( #XXX
req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_bytes_nonascii_nonutf8_urlencoded(self):
consumer = oauth.Consumer('consumer_token', 'consumer_secret')
url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
b'?q=monkeys&category=animal'
b'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA')
req = oauth.Request("GET", url)
self.assertReallyEqual(
req.normalized_url,
u('http://api.simplegeo.com/1.0/places/address.json'))
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
self.assertReallyEqual(
req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_unicode_nonascii_nonutf8_url_encoded(self):
consumer = oauth.Consumer('consumer_token', 'consumer_secret')
url = u('http://api.simplegeo.com:80/1.0/places/address.json'
'?q=monkeys&category=animal'
'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA')
req = oauth.Request("GET", url)
self.assertReallyEqual(
req.normalized_url,
u('http://api.simplegeo.com/1.0/places/address.json'))
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
self.assertReallyEqual(
req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_string_with_query(self):
url = ("https://www.google.com/m8/feeds/contacts/default/full/"
"?alt=json&max-contacts=10")
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
req = oauth.Request("GET", url, params)
self.assertEqual(
req.normalized_url,
'https://www.google.com/m8/feeds/contacts/default/full/')
self.assertEqual(req.url, url)
normalized_params = parse_qsl(req.get_normalized_parameters())
self.assertTrue(len(normalized_params), len(params) + 2)
normalized_params = dict(normalized_params)
for key, value in params.items():
if key == 'oauth_signature':
continue
self.assertEqual(value, normalized_params[key])
self.assertEqual(normalized_params['alt'], 'json')
self.assertEqual(normalized_params['max-contacts'], '10')
def test_get_normalized_parameters_empty(self):
url = "http://sp.example.com/?empty="
req = oauth.Request("GET", url)
res = req.get_normalized_parameters()
expected='empty='
self.assertEqual(expected, res)
def test_get_normalized_parameters_multiple(self):
url = "http://example.com/v2/search/videos?oauth_nonce=79815175&oauth_timestamp=1295397962&oauth_consumer_key=mykey&oauth_signature_method=HMAC-SHA1&oauth_version=1.0&offset=10&oauth_signature=spWLI%2FGQjid7sQVd5%2FarahRxzJg%3D&tag=one&tag=two"
req = oauth.Request("GET", url)
res = req.get_normalized_parameters()
expected='oauth_consumer_key=mykey&oauth_nonce=79815175&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1295397962&oauth_version=1.0&offset=10&tag=one&tag=two'
self.assertEqual(expected, res)
def test_signing_base(self):
# example copied from
# https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
# which in turns says that it was copied from
# http://oauth.net/core/1.0/#sig_base_example .
url = ("http://photos.example.net/photos?file=vacation.jpg"
"&oauth_consumer_key=dpf43f3p2l4k3l03"
"&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1"
"&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk"
"&oauth_version=1.0&size=original")
req = oauth.Request("GET", url)
sm = oauth.SignatureMethod_HMAC_SHA1()
consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo')
key, raw = sm.signing_base(req, consumer, None)
expected = b('GET&http%3A%2F%2Fphotos.example.net%2Fphotos'
'&file%3Dvacation.jpg'
'%26oauth_consumer_key%3Ddpf43f3p2l4k3l03'
'%26oauth_nonce%3Dkllo9940pd9333jh'
'%26oauth_signature_method%3DHMAC-SHA1'
'%26oauth_timestamp%3D1191242096'
'%26oauth_token%3Dnnch734d00sl2jdk'
'%26oauth_version%3D1.0%26size%3Doriginal')
self.assertEqual(expected, raw)
def test_get_normalized_parameters(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'multi': ['FOO','BAR', _UGLYPH, b'\xc2\xae'],
'multi_same': ['FOO','FOO'],
'uni_utf8_bytes': b'\xc2\xae',
'uni_unicode_object': _UGLYPH
}
req = oauth.Request("GET", url, params)
res = req.get_normalized_parameters()
expected = ('multi=BAR&multi=FOO&multi=%C2%AE&multi=%C2%AE'
'&multi_same=FOO&multi_same=FOO'
'&oauth_consumer_key=0685bd9184jfhq22'
'&oauth_nonce=4572616e48616d6d65724c61686176'
'&oauth_signature_method=HMAC-SHA1'
'&oauth_timestamp=137131200'
'&oauth_token=ad180jjd733klru7'
'&oauth_version=1.0'
'&uni_unicode_object=%C2%AE&uni_utf8_bytes=%C2%AE')
self.assertEqual(expected, res)
def test_get_normalized_string_escapes_spaces_properly(self):
url = "http://sp.example.com/"
params = {
"some_random_data": random.randint(100, 1000),
"data": "This data with a random number (%d) has spaces!"
% random.randint(1000, 2000),
}
req = oauth.Request("GET", url, params)
res = req.get_normalized_parameters()
expected = urlencode(sorted(params.items())).replace('+', '%20')
self.assertEqual(expected, res)
def test_from_request_works_with_wsgi(self):
"""Make sure WSGI header HTTP_AUTHORIZATION is detected correctly."""
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
req = oauth.Request("GET", url, params)
headers = req.to_header()
# Munge the headers
headers['HTTP_AUTHORIZATION'] = headers['Authorization']
del headers['Authorization']
# Test from the headers
req = oauth.Request.from_request("GET", url, headers)
self.assertEqual(req.method, "GET")
self.assertEqual(req.url, url)
self.assertEqual(params, req.copy())
def test_from_consumer_and_token(self):
url = "http://sp.example.com/"
tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
tok.set_verifier('this_is_a_test_verifier')
con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
req = oauth.Request.from_consumer_and_token(con, token=tok,
http_method="GET", http_url=url)
self.assertEqual(req['oauth_token'], tok.key)
self.assertEqual(req['oauth_consumer_key'], con.key)
self.assertEqual(tok.verifier, req['oauth_verifier'])
def test_no_version(self):
url = "http://sp.example.com/"
params = {
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
self.consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
self.token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = self.token.key
params['oauth_consumer_key'] = self.consumer.key
self.request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
self.request.sign_request(signature_method, self.consumer, self.token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
parameters = server.verify_request(self.request, self.consumer,
self.token)
def test_invalid_signature_method(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = SignatureMethod_Bad()
request.sign_request(signature_method, consumer, token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.Error, server.verify_request, request,
consumer, token)
def test_missing_signature(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
del request['oauth_signature']
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.MissingSignature, server.verify_request,
request, consumer, token)
# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
def test_url_with_query_string(self, mockHttpRequest):
uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
client = oauth.Client(self.consumer, None)
random_result = random.randint(1,100)
def mockrequest(cl, ur, **kw):
self.assertTrue(cl is client)
self.assertEqual(frozenset(kw.keys()),
frozenset(['method', 'body', 'redirections',
'connection_type', 'headers']))
self.assertEqual(kw['body'], b'')
self.assertEqual(kw['connection_type'], None)
self.assertEqual(kw['method'], 'GET')
self.assertEqual(kw['redirections'],
httplib2.DEFAULT_MAX_REDIRECTS)
self.assertTrue(isinstance(kw['headers'], dict))
req = oauth.Request.from_consumer_and_token(self.consumer, None,
http_method='GET', http_url=uri, parameters={})
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
self.consumer, None)
expected = parse_qsl(
urlparse(req.to_url()).query)
actual = parse_qsl(urlparse(ur).query)
self.assertEqual(len(expected), len(actual))
actual = dict(actual)
for key, value in expected:
if key not in ('oauth_signature',
'oauth_nonce', 'oauth_timestamp'):
self.assertEqual(actual[key], value)
return random_result
mockHttpRequest.side_effect = mockrequest
client.request(uri, 'GET')