def test_signature_base_string_bytes_nonascii_nonutf8(self):
consumer = oauth.Consumer('consumer_token', 'consumer_secret')
url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
b'?q=monkeys&category=animal'
b'&address=41+Decatur+St,+San+Francisc') + _B2766 + b',+CA'
req = oauth.Request("GET", url)
self.assertReallyEqual(
req.normalized_url,
u('http://api.simplegeo.com/1.0/places/address.json'))
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
self.assertReallyEqual( #XXX
req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
python类SignatureMethod_HMAC_SHA1()的实例源码
def test_signature_base_bytes_nonascii_nonutf8_urlencoded(self):
consumer = oauth.Consumer('consumer_token', 'consumer_secret')
url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
b'?q=monkeys&category=animal'
b'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA')
req = oauth.Request("GET", url)
self.assertReallyEqual(
req.normalized_url,
u('http://api.simplegeo.com/1.0/places/address.json'))
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
self.assertReallyEqual(
req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signing_base(self):
# example copied from
# https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
# which in turns says that it was copied from
# http://oauth.net/core/1.0/#sig_base_example .
url = ("http://photos.example.net/photos?file=vacation.jpg"
"&oauth_consumer_key=dpf43f3p2l4k3l03"
"&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1"
"&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk"
"&oauth_version=1.0&size=original")
req = oauth.Request("GET", url)
sm = oauth.SignatureMethod_HMAC_SHA1()
consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo')
key, raw = sm.signing_base(req, consumer, None)
expected = b('GET&http%3A%2F%2Fphotos.example.net%2Fphotos'
'&file%3Dvacation.jpg'
'%26oauth_consumer_key%3Ddpf43f3p2l4k3l03'
'%26oauth_nonce%3Dkllo9940pd9333jh'
'%26oauth_signature_method%3DHMAC-SHA1'
'%26oauth_timestamp%3D1191242096'
'%26oauth_token%3Dnnch734d00sl2jdk'
'%26oauth_version%3D1.0%26size%3Doriginal')
self.assertEqual(expected, raw)
def test_init(self):
server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()})
self.assertTrue('HMAC-SHA1' in server.signature_methods)
self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'],
oauth.SignatureMethod_HMAC_SHA1))
server = oauth.Server()
self.assertEqual(server.signature_methods, {})
def test_add_signature_method(self):
server = oauth.Server()
res = server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertTrue(len(res) == 1)
self.assertTrue('HMAC-SHA1' in res)
self.assertTrue(isinstance(res['HMAC-SHA1'],
oauth.SignatureMethod_HMAC_SHA1))
res = server.add_signature_method(oauth.SignatureMethod_PLAINTEXT())
self.assertTrue(len(res) == 2)
self.assertTrue('PLAINTEXT' in res)
self.assertTrue(isinstance(res['PLAINTEXT'],
oauth.SignatureMethod_PLAINTEXT))
def test_verify_request(self):
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
parameters = server.verify_request(self.request, self.consumer,
self.token)
self.assertTrue('bar' in parameters)
self.assertTrue('foo' in parameters)
self.assertTrue('multi' in parameters)
self.assertEqual(parameters['bar'], 'blerg')
self.assertEqual(parameters['foo'], 59)
self.assertEqual(parameters['multi'], ['FOO','BAR'])
def test_verify_request_invalid_signature(self):
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.request['oauth_signature'] = 'BOGUS'
self.assertRaises(oauth.Error,
server.verify_request, self.request, self.consumer, self.token)
def test_verify_request_invalid_timestamp(self):
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.request['oauth_timestamp'] -= 86400
self.assertRaises(oauth.Error,
server.verify_request, self.request, self.consumer, self.token)
def test_invalid_version(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '222.9922',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['foo','bar'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_invalid_signature_method(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = SignatureMethod_Bad()
request.sign_request(signature_method, consumer, token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.Error, server.verify_request, request,
consumer, token)
def test_missing_signature(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
del request['oauth_signature']
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.MissingSignature, server.verify_request,
request, consumer, token)
# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
def test_url_with_query_string(self, mockHttpRequest):
uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
client = oauth.Client(self.consumer, None)
random_result = random.randint(1,100)
def mockrequest(cl, ur, **kw):
self.assertTrue(cl is client)
self.assertEqual(frozenset(kw.keys()),
frozenset(['method', 'body', 'redirections',
'connection_type', 'headers']))
self.assertEqual(kw['body'], b'')
self.assertEqual(kw['connection_type'], None)
self.assertEqual(kw['method'], 'GET')
self.assertEqual(kw['redirections'],
httplib2.DEFAULT_MAX_REDIRECTS)
self.assertTrue(isinstance(kw['headers'], dict))
req = oauth.Request.from_consumer_and_token(self.consumer, None,
http_method='GET', http_url=uri, parameters={})
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
self.consumer, None)
expected = parse_qsl(
urlparse(req.to_url()).query)
actual = parse_qsl(urlparse(ur).query)
self.assertEqual(len(expected), len(actual))
actual = dict(actual)
for key, value in expected:
if key not in ('oauth_signature',
'oauth_nonce', 'oauth_timestamp'):
self.assertEqual(actual[key], value)
return random_result
mockHttpRequest.side_effect = mockrequest
client.request(uri, 'GET')
def post_multipart(self, url, params, files):
"""
Generates and issues a multipart request for data files
:param url: a string, the url you are requesting
:param params: a dict, a key-value of all the parameters
:param files: a list, the list of tuples for your data
:returns: a dict parsed from the JSON response
"""
# combine the parameters with the generated oauth params
params = dict(params.items() + self.generate_oauth_params().items())
faux_req = oauth.Request(method="POST", url=url, parameters=params)
faux_req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, self.token)
params = dict(parse_qsl(faux_req.to_postdata()))
content_type, body = self.encode_multipart_formdata(params, files)
headers = {'Content-Type': content_type, 'Content-Length': str(len(body))}
# Do a bytearray of the body and everything seems ok
r = urllib2.Request(url, body, headers)
if self.proxy_url:
proxy = urllib2.ProxyHandler({'http': self.proxy_url, 'https': self.proxy_url})
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
content = urllib2.urlopen(r).read()
return self.json_parse(content)
def SetCredentials(self,
consumer_key,
consumer_secret,
access_token_key=None,
access_token_secret=None):
'''Set the consumer_key and consumer_secret for this instance
Args:
consumer_key:
The consumer_key of the twitter account.
consumer_secret:
The consumer_secret for the twitter account.
access_token_key:
The oAuth access token key value you retrieved
from running get_access_token.py.
access_token_secret:
The oAuth access token's secret, also retrieved
from the get_access_token.py run.
'''
self._consumer_key = consumer_key
self._consumer_secret = consumer_secret
self._access_token_key = access_token_key
self._access_token_secret = access_token_secret
self._oauth_consumer = None
if consumer_key is not None and consumer_secret is not None and \
access_token_key is not None and access_token_secret is not None:
self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self._oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret)
self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
def get_preview_url(api_key, api_secret, sdl_id):
consumer = oauth.Consumer(api_key, api_secret)
request_url = "http://previews.7digital.com/clip/%s" % sdl_id
req = oauth.Request(method="GET", url=request_url, is_form_encoded=True)
req['oauth_timestamp'] = oauth.Request.make_timestamp()
req['oauth_nonce'] = oauth.Request.make_nonce()
req['country'] = 'US'
sig_method = oauth.SignatureMethod_HMAC_SHA1()
req.sign_request(sig_method, consumer, token=None)
url = req.to_url()
return url, False
def superSecureRequest(self, host, path, url_params=None):
"""Prepares OAuth authentication and sends the request to the API.
Args:
host (str): The domain host of the API.
path (str): The path of the API after the domain.
url_params (dict): An optional set of query parameters in the request.
Returns:
dict: The JSON response from the request.
Raises:
urllib2.HTTPError: An error occurs from the HTTP request.
"""
url_params = url_params or {}
url = 'https://{0}{1}?'.format(host, urllib.quote(path.encode('utf8')))
consumer = oauth2.Consumer(self.oath_consumer_key, self.oath_consumer_secret)
oauth_request = oauth2.Request(
method="GET", url=url, parameters=url_params)
oauth_request.update(
{
'oauth_nonce': oauth2.generate_nonce(),
'oauth_timestamp': oauth2.generate_timestamp(),
'oauth_token': self.oath_token,
'oauth_consumer_key': self.oath_consumer_key
}
)
token = oauth2.Token(self.oath_token, self.oath_token_secret)
oauth_request.sign_request(
oauth2.SignatureMethod_HMAC_SHA1(), consumer, token)
signed_url = oauth_request.to_url()
conn = urllib2.urlopen(signed_url, None)
try:
response = json.loads(conn.read().decode())
finally:
conn.close()
return response