def decode_oauth_token_secret(self):
if not self.token_cookie:
raise Error('No token cookie given')
try:
payload = jwt.decode(self.token_cookie,
self.token_secret,
algorithm = 'HS256')
except:
raise Error('Failed to retrieve oauth_token_secret from token')
self.oauth_token_secret = payload.get('data', {}).get('id', None)
if not self.oauth_token_secret:
raise Error('Token does not have an oauth_token_secret')
return self.oauth_token_secret
python类Token()的实例源码
def oauth_login(self, url, oauth_token, oauth_token_secret,
consumer_key='anonymous', consumer_secret='anonymous'):
"""Authenticate using the OAUTH method.
This only works with IMAP servers that support OAUTH (e.g. Gmail).
"""
if oauth_module:
token = oauth_module.Token(oauth_token, oauth_token_secret)
consumer = oauth_module.Consumer(consumer_key, consumer_secret)
xoauth_callable = lambda x: oauth_module.build_xoauth_string(url,
consumer,
token)
return self._command_and_check('authenticate', 'XOAUTH',
xoauth_callable, unpack=True)
else:
raise self.Error(
'The optional oauth2 package is needed for OAUTH authentication')
def get_token_and_auth_url(self, callback_url=None):
"""First step is to get the token and then send the request that
provides the auth URL
Returns a tuple of token and the authorisation URL.
"""
client = oauth.Client(self.consumer)
params = {}
params['oauth_callback'] = callback_url or 'oob'
request = oauth.Request(parameters=params)
url = REQUEST_TOKEN_URL
resp, content = client.request(url, "POST", request.to_postdata())
if resp.get('status') == '200':
token = oauth.Token.from_string(content)
yql_logger.debug("token: %s", token)
data = dict(parse_qsl(content))
yql_logger.debug("data: %s", data)
return token, data['xoauth_request_auth_url']
else:
raise YQLError(resp, content, url)
def get_token_and_auth_url(self, callback_url=None):
"""First step is to get the token and then send the request that
provides the auth URL
Returns a tuple of token and the authorisation URL.
"""
client = oauth.Client(self.consumer)
params = {}
params['oauth_callback'] = callback_url or 'oob'
request = oauth.Request(parameters=params)
url = REQUEST_TOKEN_URL
resp, content = client.request(url, "POST", request.to_postdata())
if resp.get('status') == '200':
token = oauth.Token.from_string(content)
yql_logger.debug("token: %s", token)
data = dict(parse_qsl(content))
yql_logger.debug("data: %s", data)
return token, data['xoauth_request_auth_url']
else:
raise YQLError, (resp, content, url)
def test_from_string(self):
self.assertRaises(ValueError, lambda: oauth.Token.from_string(''))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('blahblahblah'))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('blah=blah'))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=asfdasf'))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret='))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=asfdasf'))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token='))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=&oauth_token_secret='))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=tooken%26oauth_token_secret=seecret'))
string = self.token.to_string()
new = oauth.Token.from_string(string)
self._compare_tokens(new)
self.token.set_callback('http://www.example.com/my-callback')
string = self.token.to_string()
new = oauth.Token.from_string(string)
self._compare_tokens(new)
def test_from_token_and_callback(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
req = oauth.Request.from_token_and_callback(tok)
self.assertFalse('oauth_callback' in req)
self.assertEqual(req['oauth_token'], tok.key)
req = oauth.Request.from_token_and_callback(tok, callback=url)
self.assertTrue('oauth_callback' in req)
self.assertEqual(req['oauth_callback'], url)
def setUp(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
self.consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
self.token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = self.token.key
params['oauth_consumer_key'] = self.consumer.key
self.request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
self.request.sign_request(signature_method, self.consumer, self.token)
def test_init(self):
class Blah():
pass
try:
client = oauth.Client(Blah())
self.fail("Client.__init__() accepted invalid Consumer.")
except ValueError:
pass
consumer = oauth.Consumer('token', 'secret')
try:
client = oauth.Client(consumer, Blah())
self.fail("Client.__init__() accepted invalid Token.")
except ValueError:
pass
def make_request(self, url, method="GET", body="", headers=None):
"""Makes a request to the TradeKing API."""
consumer = Consumer(key=TRADEKING_CONSUMER_KEY,
secret=TRADEKING_CONSUMER_SECRET)
token = Token(key=TRADEKING_ACCESS_TOKEN,
secret=TRADEKING_ACCESS_TOKEN_SECRET)
client = Client(consumer, token)
self.logs.debug("TradeKing request: %s %s %s %s" %
(url, method, body, headers))
response, content = client.request(url, method=method, body=body,
headers=headers)
self.logs.debug("TradeKing response: %s %s" % (response, content))
try:
return loads(content)
except ValueError:
self.logs.error("Failed to decode JSON response: %s" % content)
return None
def get_user_information(self):
token = oauth2.Token(self.oauth_token, self.oauth_token_secret)
token.set_verifier(self.oauth_verifier)
client = oauth2.Client(self.consumer, token)
url = 'https://api.twitter.com/oauth/access_token'
resp, content = client.request(url, 'POST')
if resp.status != 200:
raise Error('{} from Twitter'.format(resp.status))
provider_user = dict(parse_qsl(content.decode('utf-8')))
if 'user_id' not in provider_user:
raise Error('No user_id from Twitter')
self.status = 200
self.user_id = provider_user.get('user_id')
self.user_name = provider_user.get('screen_name', None)
return (self.user_id, self.user_name,)
def get_access_token(self, oauth_token, oauth_token_secret, oauth_verifier):
"""
Func for final leg of three-legged oauth,
takes token and secret returned in oauth_callback GET dict and
exchanges them for the permanent access token and secret
returns an access_token dict with two keys, oauth_token and oauth_token_secret
"""
token = oauth.Token(oauth_token, oauth_token_secret)
token.set_verifier(oauth_verifier)
consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET)
client = oauth.Client(consumer, token)
# Now we fire the request at access token url instead of request token
resp, content = client.request(self.ACCESS_TOKEN_URL, "POST")
if resp['status'] != '200':
raise Exception("Invalid response %s." % resp['status'])
access_token = dict(urllib.parse.parse_qsl(content))
# urllib returns bytes, which will need to be decoded using the string.decode() method before use
access_token = {key.decode(): value.decode() for key, value in access_token.items()}
# Return the token dict containing token and secret
return access_token
def test_from_string(self):
self.assertRaises(ValueError, lambda: oauth.Token.from_string(''))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('blahblahblah'))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('blah=blah'))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=asfdasf'))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret='))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=asfdasf'))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token='))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=&oauth_token_secret='))
self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=tooken%26oauth_token_secret=seecret'))
string = self.token.to_string()
new = oauth.Token.from_string(string)
self._compare_tokens(new)
self.token.set_callback('http://www.example.com/my-callback')
string = self.token.to_string()
new = oauth.Token.from_string(string)
self._compare_tokens(new)
def test_from_token_and_callback(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200",
'oauth_consumer_key': "0685bd9184jfhq22",
'oauth_signature_method': "HMAC-SHA1",
'oauth_token': "ad180jjd733klru7",
'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
}
tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
req = oauth.Request.from_token_and_callback(tok)
self.assertFalse('oauth_callback' in req)
self.assertEqual(req['oauth_token'], tok.key)
req = oauth.Request.from_token_and_callback(tok, callback=url)
self.assertTrue('oauth_callback' in req)
self.assertEqual(req['oauth_callback'], url)
def setUp(self):
url = "http://sp.example.com/"
params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'multi': ['FOO','BAR'],
'foo': 59
}
self.consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
self.token = oauth.Token(key="token-key", secret="token-secret")
params['oauth_token'] = self.token.key
params['oauth_consumer_key'] = self.consumer.key
self.request = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
self.request.sign_request(signature_method, self.consumer, self.token)
def test_init(self):
class Blah():
pass
try:
client = oauth.Client(Blah())
self.fail("Client.__init__() accepted invalid Consumer.")
except ValueError:
pass
consumer = oauth.Consumer('token', 'secret')
try:
client = oauth.Client(consumer, Blah())
self.fail("Client.__init__() accepted invalid Token.")
except ValueError:
pass
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret):
self.access_token_key = access_token_key
self.access_token_secret = access_token_secret
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
_debug = 0
self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret)
self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret)
self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self.http_handler = urllib.HTTPHandler(debuglevel=_debug)
self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
def __init__(self, consumer_key, consumer_secret="", oauth_token="", oauth_secret="", host="https://api.tumblr.com",
proxy_url=None):
self.host = host
self.consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
self.token = oauth.Token(key=oauth_token, secret=oauth_secret)
self.proxy_url = proxy_url
if proxy_url:
print("Generating Proxy From proxy_url")
self.proxy_info = httplib2.proxy_info_from_url("https://" + proxy_url, 'http')
self.proxy_info.proxy_rdns = True
# uri = urlparse(proxy_url)
# self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP,uri.hostname,uri.port,proxy_rdns=True)
else:
print("Generating proxy from ENV")
proxy_url = os.environ.get('HTTPS_PROXY', None)
if proxy_url:
uri = urlparse(proxy_url)
self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP, uri.hostname, uri.port, proxy_rdns=True)
else:
self.proxy_info = None
def twitter_oauth2(oauth_token, oauth_verifier):
auth_tokens = db.OAuthToken.query(db.OAuthToken.temp_oauth_token == oauth_token, db.OAuthToken.application == db.APP_TWITTER).fetch(1)
if not auth_tokens:
return None
auth_token = auth_tokens[0]
# Step 3: Once the consumer has redirected the user back to the oauth_callback
# URL you can request the access token the user has approved. You use the
# request token to sign this request. After this is done you throw away the
# request token and use the access token returned. You should store this
# access token somewhere safe, like a database, for future use.
token = oauth.Token(oauth_token, auth_token.temp_oauth_token_secret)
token.set_verifier(oauth_verifier)
consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret)
client = oauth.Client(consumer, token)
resp, content = client.request(access_token_url, "POST")
access_token = dict(urlparse.parse_qsl(content))
auth_token.oauth_token = access_token['oauth_token']
auth_token.oauth_token_secret = access_token['oauth_token_secret']
auth_token.valid_token = True
auth_token.time_between_posts = 5 * 60 # every 5 minutes
auth_token.put()
return auth_token
def __init__(self,consumer_key,consumer_secret,access_token=None):
""" Initializes the splitwise class. Sets consumer and access token
Args:
consumer_key (str) : Consumer Key provided by Spliwise
consumer_secret (str): Consumer Secret provided by Splitwise
access_token (:obj: `dict`) Access Token is a combination of oauth_token and oauth_token_secret
Returns:
A Splitwise Object
"""
self.consumer = oauth.Consumer(consumer_key, consumer_secret)
#If access token is present then set the Access token
if access_token:
self.setAccessToken(access_token)
def getAccessToken(self,oauth_token,oauth_token_secret,oauth_verifier):
token = oauth.Token(oauth_token,oauth_token_secret)
token.set_verifier(oauth_verifier)
client = oauth.Client(self.consumer, token)
resp, content = client.request(Splitwise.ACCESS_TOKEN_URL, "POST")
if Splitwise.isDebug():
print(resp, content)
#Check if the response is correct
if resp['status'] != '200':
raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status'])
access_token = dict(parse_qsl(content.decode("utf-8")))
return access_token
def get_tweets(twitter_handle):
CONSUMER_KEY = 'YurZ2aWgVEYVhE5c60XyWRGG0'
CONSUMER_SECRET = 'TIaSl3xnVTNUzHrwz84lOycTYkwe9l1NocwkB4hGbd2ngMufn6'
ACCESS_KEY = '564268368-tRdDLN3O9EKzlaY28NzHCgNsYJX59YRngv3qjjJh'
ACCESS_SECRET = 'VUjRU2ftlmnUdqDtppMiV6LLqT83ZbKDDUjcpWtrT1PG4'
consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET)
access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET)
client = oauth.Client(consumer, access_token)
endpoint = "https://api.twitter.com/1.1/search/tweets.json?q=from%3A" + twitter_handle + "&result_type=all&count=100"
response, data = client.request(endpoint)
tweet_data = json.loads(data.decode('utf-8'))
tweets = []
for tweet in tweet_data['statuses']:
tweets.append(re.sub(r'https:\/\/t\.co\/.{10}', '', tweet['text'] + ' \n'))
return tweets
#example
# print(get_tweets("Cmdr_Hadfield"))
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
self.docmd('AUTH', 'XOAUTH %s' % \
base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
lambda x: oauth2.build_xoauth_string(url, consumer, token))
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
self.docmd('AUTH', 'XOAUTH %s' % \
base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
lambda x: oauth2.build_xoauth_string(url, consumer, token))
def setUp(self):
self.key = 'my-key'
self.secret = 'my-secret'
self.token = oauth.Token(self.key, self.secret)
def test_basic(self):
self.assertRaises(ValueError, lambda: oauth.Token(None, None))
self.assertRaises(ValueError, lambda: oauth.Token('asf', None))
self.assertRaises(ValueError, lambda: oauth.Token(None, 'dasf'))
def test_unset_consumer_and_token(self):
consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
token = oauth.Token('my_key', 'my_secret')
request = oauth.Request("GET", "http://example.com/fetch.php")
request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
token)
self.assertEqual(consumer.key, request['oauth_consumer_key'])
self.assertEqual(token.key, request['oauth_token'])
def test_no_url_set(self):
consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
token = oauth.Token('my_key', 'my_secret')
request = oauth.Request()
self.assertRaises(ValueError,
request.sign_request,
oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
def test_from_consumer_and_token(self):
url = "http://sp.example.com/"
tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
tok.set_verifier('this_is_a_test_verifier')
con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
req = oauth.Request.from_consumer_and_token(con, token=tok,
http_method="GET", http_url=url)
self.assertEqual(req['oauth_token'], tok.key)
self.assertEqual(req['oauth_consumer_key'], con.key)
self.assertEqual(tok.verifier, req['oauth_verifier'])