def test_no_version(self):
url = "http://sp.example.com/"
params = {
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
self.consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
self.token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = self.token.key
params['oauth_consumer_key'] = self.consumer.key
self.request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
self.request.sign_request(signature_method, self.consumer, self.token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
parameters = server.verify_request(self.request, self.consumer,
self.token)
python类Token()的实例源码
def test_invalid_version(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '222.9922',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['foo','bar'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_missing_signature(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
del request['oauth_signature']
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.MissingSignature, server.verify_request,
request, consumer, token)
# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
def __init__(
self,
consumer_key,
consumer_secret,
token,
token_secret
):
self.consumer = oauth2.Consumer(consumer_key, consumer_secret)
self.token = oauth2.Token(token, token_secret)
def is_valid_request(
self,
request: t.Any,
parameters: t.MutableMapping[str, str] = {},
fake_method: t.Any = None,
handle_error: bool = True
) -> bool:
'''
Validates an OAuth request using the python-oauth2 library:
https://github.com/simplegeo/python-oauth2
'''
def handle(e: oauth2.Error) -> bool:
if handle_error:
return False
else:
raise e
try:
method, url, headers, parameters = self.parse_request(
request, parameters, fake_method
)
oauth_request = oauth2.Request.from_request(
method, url, headers=headers, parameters=parameters
)
oauth2.Token
self.oauth_server.verify_request(
oauth_request, self.oauth_consumer, {}
)
except oauth2.Error as e:
return handle(e)
except ValueError as e:
return handle(e)
# Signature was valid
return True
def SetCredentials(self,
consumer_key,
consumer_secret,
access_token_key=None,
access_token_secret=None):
'''Set the consumer_key and consumer_secret for this instance
Args:
consumer_key:
The consumer_key of the twitter account.
consumer_secret:
The consumer_secret for the twitter account.
access_token_key:
The oAuth access token key value you retrieved
from running get_access_token.py.
access_token_secret:
The oAuth access token's secret, also retrieved
from the get_access_token.py run.
'''
self._consumer_key = consumer_key
self._consumer_secret = consumer_secret
self._access_token_key = access_token_key
self._access_token_secret = access_token_secret
self._oauth_consumer = None
if consumer_key is not None and consumer_secret is not None and \
access_token_key is not None and access_token_secret is not None:
self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self._oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret)
self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
def SetCredentials(self,
consumer_key,
consumer_secret,
access_token_key=None,
access_token_secret=None):
'''Set the consumer_key and consumer_secret for this instance
Args:
consumer_key:
The consumer_key of the twitter account.
consumer_secret:
The consumer_secret for the twitter account.
access_token_key:
The oAuth access token key value you retrieved
from running get_access_token.py.
access_token_secret:
The oAuth access token's secret, also retrieved
from the get_access_token.py run.
'''
self._consumer_key = consumer_key
self._consumer_secret = consumer_secret
self._access_token_key = access_token_key
self._access_token_secret = access_token_secret
self._oauth_consumer = None
if consumer_key is not None and consumer_secret is not None and \
access_token_key is not None and access_token_secret is not None:
self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self._oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret)
self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
self.docmd('AUTH', 'XOAUTH %s' % \
base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
lambda x: oauth2.build_xoauth_string(url, consumer, token))
def setUp(self):
self.key = 'my-key'
self.secret = 'my-secret'
self.token = oauth.Token(self.key, self.secret)
def test_basic(self):
self.assertRaises(ValueError, lambda: oauth.Token(None, None))
self.assertRaises(ValueError, lambda: oauth.Token('asf', None))
self.assertRaises(ValueError, lambda: oauth.Token(None, 'dasf'))
def test_unset_consumer_and_token(self):
consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
token = oauth.Token('my_key', 'my_secret')
request = oauth.Request("GET", "http://example.com/fetch.php")
request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
token)
self.assertEqual(consumer.key, request['oauth_consumer_key'])
self.assertEqual(token.key, request['oauth_token'])
def test_no_url_set(self):
consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
token = oauth.Token('my_key', 'my_secret')
request = oauth.Request()
self.assertRaises(ValueError,
request.sign_request,
oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
def test_from_consumer_and_token(self):
url = "http://sp.example.com/"
tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
tok.set_verifier('this_is_a_test_verifier')
con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
req = oauth.Request.from_consumer_and_token(con, token=tok,
http_method="GET", http_url=url)
self.assertEqual(req['oauth_token'], tok.key)
self.assertEqual(req['oauth_consumer_key'], con.key)
self.assertEqual(tok.verifier, req['oauth_verifier'])
def test_no_version(self):
url = "http://sp.example.com/"
params = {
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
self.consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
self.token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = self.token.key
params['oauth_consumer_key'] = self.consumer.key
self.request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
self.request.sign_request(signature_method, self.consumer, self.token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
parameters = server.verify_request(self.request, self.consumer,
self.token)
def test_invalid_version(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '222.9922',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['foo','bar'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_missing_signature(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
del request['oauth_signature']
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.MissingSignature, server.verify_request,
request, consumer, token)
# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
def __init__( self,
consumer_key,
consumer_secret,
user_token,
user_secret,
logger):
logger.info("OAuth:\nConsumer Key:{}\nConsumer Secret:{}\nUser Token:{}\nUser Secret:{}".format(consumer_key,consumer_secret,user_token,user_secret))
consumer = oauth.Consumer(consumer_key,consumer_secret)
client = oauth.Client(consumer)
access_token = oauth.Token(key=user_token,
secret=user_secret)
client = oauth.Client(consumer,access_token,timeout=MediaConnection.timeout)
self.client = client
super().__init__(logger)