def test_no_version(self):
url = "http://sp.example.com/"
params = {
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
self.consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
self.token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = self.token.key
params['oauth_consumer_key'] = self.consumer.key
self.request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
self.request.sign_request(signature_method, self.consumer, self.token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
parameters = server.verify_request(self.request, self.consumer,
self.token)
python类Consumer()的实例源码
def test_invalid_version(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '222.9922',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['foo','bar'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_invalid_signature_method(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = SignatureMethod_Bad()
request.sign_request(signature_method, consumer, token)
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.Error, server.verify_request, request,
consumer, token)
def test_missing_signature(self):
url = "http://sp.example.com/"
params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
del request['oauth_signature']
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
self.assertRaises(oauth.MissingSignature, server.verify_request,
request, consumer, token)
# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
def setUp(self):
self.consumer = oauth.Consumer(key=self.consumer_key,
secret=self.consumer_secret)
self.body = {
'foo': 'bar',
'bar': 'foo',
'multi': ['FOO','BAR'],
'blah': 599999
}
def test_init_passes_kwargs_to_httplib2(self):
class Blah():
pass
consumer = oauth.Consumer('token', 'secret')
# httplib2 options
client = oauth.Client(consumer, None, cache='.cache', timeout=3, disable_ssl_certificate_validation=True)
self.assertNotEqual(client.cache, None)
self.assertEqual(client.timeout, 3)
def __init__(
self,
consumer_key,
consumer_secret,
token,
token_secret
):
self.consumer = oauth2.Consumer(consumer_key, consumer_secret)
self.token = oauth2.Token(token, token_secret)
def __init__(self, g, client_id, client_secret, auth_url, token_url, access_token_url, socket_timeout=60):
self.globals = g
self.client_id = client_id
self.client_secret = client_secret
self.code = None
self.request = current.request
self.session = current.session
self.auth_url = auth_url
self.token_url = token_url
self.access_token_url = access_token_url
self.socket_timeout = socket_timeout
# consumer init
self.consumer = oauth.Consumer(self.client_id, self.client_secret)
def __init__(self, g, client_id, client_secret, auth_url, token_url, access_token_url, socket_timeout=60):
self.globals = g
self.client_id = client_id
self.client_secret = client_secret
self.code = None
self.request = current.request
self.session = current.session
self.auth_url = auth_url
self.token_url = token_url
self.access_token_url = access_token_url
self.socket_timeout = socket_timeout
# consumer init
self.consumer = oauth.Consumer(self.client_id, self.client_secret)
def post_replace_result(
self, score: str, result_data: t.Mapping[str, str] = None
) -> 'OutcomeResponse':
'''
POSTs the given score to the Tool Consumer with a replaceResult.
OPTIONAL:
result_data must be a dictionary
Note: ONLY ONE of these values can be in the dict at a time,
due to the Canvas specification.
:param str text: text
:param str url: url
:param str launchUrl: The lti launch url
'''
self.operation = REPLACE_REQUEST
self.score = score
self.result_data = result_data
if result_data is not None:
if len(result_data) > 1:
error_msg = (
'Dictionary result_data can only have one entry. '
'{0} entries were found.'.format(len(result_data))
)
raise ValueError(error_msg)
elif any(a in result_data for a in ['url', 'text', 'launchUrl']):
return self.post_outcome_request()
else:
error_msg = (
'Dictionary result_data can only have the key '
'"text" or the key "url".'
)
raise ValueError(error_msg)
else:
return self.post_outcome_request()
def post_delete_result(self) -> 'OutcomeResponse':
'''
POSTs a deleteRequest to the Tool Consumer.
'''
self.operation = DELETE_REQUEST
return self.post_outcome_request()
def post_read_result(self) -> 'OutcomeResponse':
'''
POSTS a readResult to the Tool Consumer.
'''
self.operation = READ_REQUEST
return self.post_outcome_request()
def __init__(self, key: str, secret: str) -> None:
super(RequestValidatorMixin, self).__init__()
self.consumer_key = key
self.consumer_secret = secret
self.oauth_server = oauth2.Server()
signature_method = oauth2.SignatureMethod_HMAC_SHA1()
self.oauth_server.add_signature_method(signature_method)
self.oauth_consumer = oauth2.Consumer(
self.consumer_key, self.consumer_secret
)
def __init__(self, request_url, params, token_secret, token_cookie):
self.request_url = request_url
self.params = params
self.token_secret = token_secret
self.token_cookie = token_cookie
self.status = False
self.user_id = None
self.user_name = None
self.set_token_cookie = None
self.oauth_verifier = params.get('oauth_verifier', None)
self.oauth_token = params.get('oauth_token', None)
self.oauth_token_secret = None
self.consumer_key = os.environ.get('TWITTER_CONSUMER_KEY', None)
if not self.consumer_key:
raise Error('No TWITTER_CONSUMER_KEY environment value')
self.consumer_secret = os.environ.get('TWITTER_CONSUMER_SECRET', None)
if not self.consumer_secret:
raise Error('No TWITTER_CONSUMER_SECRET environment value')
self.consumer = oauth2.Consumer(
key = self.consumer_key,
secret = self.consumer_secret)
if params.get('login') == 'start':
self.start()
elif self.oauth_verifier and self.oauth_token:
self.finish()
else:
raise InvalidUsage('Invalid request')
def __init__(self, g, client_id, client_secret, auth_url, token_url, access_token_url, socket_timeout=60):
self.globals = g
self.client_id = client_id
self.client_secret = client_secret
self.code = None
self.request = current.request
self.session = current.session
self.auth_url = auth_url
self.token_url = token_url
self.access_token_url = access_token_url
self.socket_timeout = socket_timeout
# consumer init
self.consumer = oauth.Consumer(self.client_id, self.client_secret)
def SetCredentials(self,
consumer_key,
consumer_secret,
access_token_key=None,
access_token_secret=None):
'''Set the consumer_key and consumer_secret for this instance
Args:
consumer_key:
The consumer_key of the twitter account.
consumer_secret:
The consumer_secret for the twitter account.
access_token_key:
The oAuth access token key value you retrieved
from running get_access_token.py.
access_token_secret:
The oAuth access token's secret, also retrieved
from the get_access_token.py run.
'''
self._consumer_key = consumer_key
self._consumer_secret = consumer_secret
self._access_token_key = access_token_key
self._access_token_secret = access_token_secret
self._oauth_consumer = None
if consumer_key is not None and consumer_secret is not None and \
access_token_key is not None and access_token_secret is not None:
self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self._oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret)
self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
def SetCredentials(self,
consumer_key,
consumer_secret,
access_token_key=None,
access_token_secret=None):
'''Set the consumer_key and consumer_secret for this instance
Args:
consumer_key:
The consumer_key of the twitter account.
consumer_secret:
The consumer_secret for the twitter account.
access_token_key:
The oAuth access token key value you retrieved
from running get_access_token.py.
access_token_secret:
The oAuth access token's secret, also retrieved
from the get_access_token.py run.
'''
self._consumer_key = consumer_key
self._consumer_secret = consumer_secret
self._access_token_key = access_token_key
self._access_token_secret = access_token_secret
self._oauth_consumer = None
if consumer_key is not None and consumer_secret is not None and \
access_token_key is not None and access_token_secret is not None:
self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self._oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret)
self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
self.docmd('AUTH', 'XOAUTH %s' % \
base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))