def test_set_signature_method(self):
consumer = oauth.Consumer('key', 'secret')
client = oauth.Client(consumer)
class Blah:
pass
try:
client.set_signature_method(Blah())
self.fail("Client.set_signature_method() accepted invalid method.")
except ValueError:
pass
m = oauth.SignatureMethod_HMAC_SHA1()
client.set_signature_method(m)
self.assertEqual(m, client.method)
python类SignatureMethod_HMAC_SHA1()的实例源码
def sign_request(self, url, url_params={}):
oauth_request = oauth2.Request(
method="GET",
url=url,
parameters=url_params
)
oauth_request.update(
{
'oauth_nonce': oauth2.generate_nonce(),
'oauth_timestamp': oauth2.generate_timestamp(),
'oauth_token': self.token.key,
'oauth_consumer_key': self.consumer.key
}
)
oauth_request.sign_request(
oauth2.SignatureMethod_HMAC_SHA1(),
self.consumer,
self.token
)
return oauth_request.to_url()
def verifySignature(self, secret):
"""See L{IOAuthCredentials#verifySignature}."""
consumer = Consumer(key=self.consumerKey, secret=secret)
oauthRequest = Request.from_request(
self.method, self.url, headers=self.headers,
query_string=self.arguments)
# verify the request has been oauth authorized, we only support
# HMAC-SHA1, reject OAuth signatures if they use a different method
if self.signatureMethod != 'HMAC-SHA1':
raise NotImplementedError(
'Unknown signature method: %s' % self.signatureMethod)
signatureMethod = SignatureMethod_HMAC_SHA1()
result = signatureMethod.check(oauthRequest, consumer, None,
self.signature)
return result
def test_set_signature_method(self):
consumer = oauth.Consumer('key', 'secret')
client = oauth.Client(consumer)
class Blah:
pass
try:
client.set_signature_method(Blah())
self.fail("Client.set_signature_method() accepted invalid method.")
except ValueError:
pass
m = oauth.SignatureMethod_HMAC_SHA1()
client.set_signature_method(m)
self.assertEqual(m, client.method)
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret):
self.access_token_key = access_token_key
self.access_token_secret = access_token_secret
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
_debug = 0
self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret)
self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret)
self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self.http_handler = urllib.HTTPHandler(debuglevel=_debug)
self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
def signed_request(url, access_token=None):
''' return a signed request. Usually not used, save for specific
functions like deriving a preview-clip URL.
'''
consumer = _consumer()
req = oauth.Request.from_consumer_and_token(
consumer,
http_url=url,
is_form_encoded=True,
parameters={'country':api_settings.country})
signing_method = oauth.SignatureMethod_HMAC_SHA1()
req.sign_request(signing_method, consumer, access_token)
return req
def signed_request(url, access_token=None):
''' return a signed request. Usually not used, save for specific
functions like deriving a preview-clip URL.
'''
consumer = _consumer()
req = oauth.Request.from_consumer_and_token(
consumer,
http_url=url,
is_form_encoded=True,
parameters={'country':api_settings.country})
signing_method = oauth.SignatureMethod_HMAC_SHA1()
req.sign_request(signing_method, consumer, access_token)
return req
def __init__(self, api_key, shared_secret, httplib2_inst=None):
"""Override init to ensure required args"""
super(TwoLegged, self).__init__(api_key, shared_secret, httplib2_inst)
self.endpoint = PRIVATE_ENDPOINT
self.hmac_sha1_signature = oauth.SignatureMethod_HMAC_SHA1()
self.plaintext_signature = oauth.SignatureMethod_PLAINTEXT()
def __init__(self, api_key, shared_secret, httplib2_inst=None):
"""Override init to ensure required args"""
super(TwoLegged, self).__init__(api_key, shared_secret, httplib2_inst)
self.endpoint = PRIVATE_ENDPOINT
self.scheme = HTTPS_SCHEME
self.hmac_sha1_signature = oauth.SignatureMethod_HMAC_SHA1()
self.plaintext_signature = oauth.SignatureMethod_PLAINTEXT()
def test_unset_consumer_and_token(self):
consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
token = oauth.Token('my_key', 'my_secret')
request = oauth.Request("GET", "http://example.com/fetch.php")
request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
token)
self.assertEqual(consumer.key, request['oauth_consumer_key'])
self.assertEqual(token.key, request['oauth_token'])
def test_no_url_set(self):
consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
token = oauth.Token('my_key', 'my_secret')
request = oauth.Request()
self.assertRaises(ValueError,
request.sign_request,
oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
def test_signature_base_unicode_nonascii(self):
consumer = oauth.Consumer('consumer_token', 'consumer_secret')
url = u('http://api.simplegeo.com:80/1.0/places/address.json'
'?q=monkeys&category=animal'
'&address=41+Decatur+St,+San+Francisc') + _U2766 + u(',+CA')
req = oauth.Request("GET", url)
self.assertReallyEqual(
req.normalized_url,
u('http://api.simplegeo.com/1.0/places/address.json'))
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
self.assertReallyEqual(
req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_string_bytes_nonascii_nonutf8(self):
consumer = oauth.Consumer('consumer_token', 'consumer_secret')
url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
b'?q=monkeys&category=animal'
b'&address=41+Decatur+St,+San+Francisc') + _B2766 + b',+CA'
req = oauth.Request("GET", url)
self.assertReallyEqual(
req.normalized_url,
u('http://api.simplegeo.com/1.0/places/address.json'))
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
self.assertReallyEqual( #XXX
req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_bytes_nonascii_nonutf8_urlencoded(self):
consumer = oauth.Consumer('consumer_token', 'consumer_secret')
url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
b'?q=monkeys&category=animal'
b'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA')
req = oauth.Request("GET", url)
self.assertReallyEqual(
req.normalized_url,
u('http://api.simplegeo.com/1.0/places/address.json'))
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
self.assertReallyEqual(
req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signing_base(self):
# example copied from
# https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
# which in turns says that it was copied from
# http://oauth.net/core/1.0/#sig_base_example .
url = ("http://photos.example.net/photos?file=vacation.jpg"
"&oauth_consumer_key=dpf43f3p2l4k3l03"
"&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1"
"&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk"
"&oauth_version=1.0&size=original")
req = oauth.Request("GET", url)
sm = oauth.SignatureMethod_HMAC_SHA1()
consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo')
key, raw = sm.signing_base(req, consumer, None)
expected = b('GET&http%3A%2F%2Fphotos.example.net%2Fphotos'
'&file%3Dvacation.jpg'
'%26oauth_consumer_key%3Ddpf43f3p2l4k3l03'
'%26oauth_nonce%3Dkllo9940pd9333jh'
'%26oauth_signature_method%3DHMAC-SHA1'
'%26oauth_timestamp%3D1191242096'
'%26oauth_token%3Dnnch734d00sl2jdk'
'%26oauth_version%3D1.0%26size%3Doriginal')
self.assertEqual(expected, raw)
def test_init(self):
server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()})
self.assertTrue('HMAC-SHA1' in server.signature_methods)
self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'],
oauth.SignatureMethod_HMAC_SHA1))
server = oauth.Server()
self.assertEqual(server.signature_methods, {})
def test_add_signature_method(self):
server = oauth.Server()
res = server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertTrue(len(res) == 1)
self.assertTrue('HMAC-SHA1' in res)
self.assertTrue(isinstance(res['HMAC-SHA1'],
oauth.SignatureMethod_HMAC_SHA1))
res = server.add_signature_method(oauth.SignatureMethod_PLAINTEXT())
self.assertTrue(len(res) == 2)
self.assertTrue('PLAINTEXT' in res)
self.assertTrue(isinstance(res['PLAINTEXT'],
oauth.SignatureMethod_PLAINTEXT))
def test_verify_request(self):
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
parameters = server.verify_request(self.request, self.consumer,
self.token)
self.assertTrue('bar' in parameters)
self.assertTrue('foo' in parameters)
self.assertTrue('multi' in parameters)
self.assertEqual(parameters['bar'], 'blerg')
self.assertEqual(parameters['foo'], 59)
self.assertEqual(parameters['multi'], ['FOO','BAR'])
def test_verify_request_invalid_signature(self):
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.request['oauth_signature'] = 'BOGUS'
self.assertRaises(oauth.Error,
server.verify_request, self.request, self.consumer, self.token)
def test_verify_request_invalid_timestamp(self):
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.request['oauth_timestamp'] -= 86400
self.assertRaises(oauth.Error,
server.verify_request, self.request, self.consumer, self.token)
def test_invalid_version(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '222.9922',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['foo','bar'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_invalid_signature_method(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = SignatureMethod_Bad()
request.sign_request(signature_method, consumer, token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.Error, server.verify_request, request,
consumer, token)
def test_missing_signature(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
del request['oauth_signature']
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.MissingSignature, server.verify_request,
request, consumer, token)
# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
def test_url_with_query_string(self, mockHttpRequest):
uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
client = oauth.Client(self.consumer, None)
random_result = random.randint(1,100)
def mockrequest(cl, ur, **kw):
self.assertTrue(cl is client)
self.assertEqual(frozenset(kw.keys()),
frozenset(['method', 'body', 'redirections',
'connection_type', 'headers']))
self.assertEqual(kw['body'], b'')
self.assertEqual(kw['connection_type'], None)
self.assertEqual(kw['method'], 'GET')
self.assertEqual(kw['redirections'],
httplib2.DEFAULT_MAX_REDIRECTS)
self.assertTrue(isinstance(kw['headers'], dict))
req = oauth.Request.from_consumer_and_token(self.consumer, None,
http_method='GET', http_url=uri, parameters={})
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
self.consumer, None)
expected = parse_qsl(
urlparse(req.to_url()).query)
actual = parse_qsl(urlparse(ur).query)
self.assertEqual(len(expected), len(actual))
actual = dict(actual)
for key, value in expected:
if key not in ('oauth_signature',
'oauth_nonce', 'oauth_timestamp'):
self.assertEqual(actual[key], value)
return random_result
mockHttpRequest.side_effect = mockrequest
client.request(uri, 'GET')
def __init__(self, key: str, secret: str) -> None:
super(RequestValidatorMixin, self).__init__()
self.consumer_key = key
self.consumer_secret = secret
self.oauth_server = oauth2.Server()
signature_method = oauth2.SignatureMethod_HMAC_SHA1()
self.oauth_server.add_signature_method(signature_method)
self.oauth_consumer = oauth2.Consumer(
self.consumer_key, self.consumer_secret
)
def SetCredentials(self,
consumer_key,
consumer_secret,
access_token_key=None,
access_token_secret=None):
'''Set the consumer_key and consumer_secret for this instance
Args:
consumer_key:
The consumer_key of the twitter account.
consumer_secret:
The consumer_secret for the twitter account.
access_token_key:
The oAuth access token key value you retrieved
from running get_access_token.py.
access_token_secret:
The oAuth access token's secret, also retrieved
from the get_access_token.py run.
'''
self._consumer_key = consumer_key
self._consumer_secret = consumer_secret
self._access_token_key = access_token_key
self._access_token_secret = access_token_secret
self._oauth_consumer = None
if consumer_key is not None and consumer_secret is not None and \
access_token_key is not None and access_token_secret is not None:
self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self._oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret)
self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
def SetCredentials(self,
consumer_key,
consumer_secret,
access_token_key=None,
access_token_secret=None):
'''Set the consumer_key and consumer_secret for this instance
Args:
consumer_key:
The consumer_key of the twitter account.
consumer_secret:
The consumer_secret for the twitter account.
access_token_key:
The oAuth access token key value you retrieved
from running get_access_token.py.
access_token_secret:
The oAuth access token's secret, also retrieved
from the get_access_token.py run.
'''
self._consumer_key = consumer_key
self._consumer_secret = consumer_secret
self._access_token_key = access_token_key
self._access_token_secret = access_token_secret
self._oauth_consumer = None
if consumer_key is not None and consumer_secret is not None and \
access_token_key is not None and access_token_secret is not None:
self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self._oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret)
self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
def test_unset_consumer_and_token(self):
consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
token = oauth.Token('my_key', 'my_secret')
request = oauth.Request("GET", "http://example.com/fetch.php")
request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
token)
self.assertEqual(consumer.key, request['oauth_consumer_key'])
self.assertEqual(token.key, request['oauth_token'])
def test_no_url_set(self):
consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
token = oauth.Token('my_key', 'my_secret')
request = oauth.Request()
self.assertRaises(ValueError,
request.sign_request,
oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
def test_signature_base_unicode_nonascii(self):
consumer = oauth.Consumer('consumer_token', 'consumer_secret')
url = u('http://api.simplegeo.com:80/1.0/places/address.json'
'?q=monkeys&category=animal'
'&address=41+Decatur+St,+San+Francisc') + _U2766 + u(',+CA')
req = oauth.Request("GET", url)
self.assertReallyEqual(
req.normalized_url,
u('http://api.simplegeo.com/1.0/places/address.json'))
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
self.assertReallyEqual(
req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')