def get_token_and_auth_url(self, callback_url=None):
"""First step is to get the token and then send the request that
provides the auth URL
Returns a tuple of token and the authorisation URL.
"""
client = oauth.Client(self.consumer)
params = {}
params['oauth_callback'] = callback_url or 'oob'
request = oauth.Request(parameters=params)
url = REQUEST_TOKEN_URL
resp, content = client.request(url, "POST", request.to_postdata())
if resp.get('status') == '200':
token = oauth.Token.from_string(content)
yql_logger.debug("token: %s", token)
data = dict(parse_qsl(content))
yql_logger.debug("data: %s", data)
return token, data['xoauth_request_auth_url']
else:
raise YQLError(resp, content, url)
python类Client()的实例源码
def get_token_and_auth_url(self, callback_url=None):
"""First step is to get the token and then send the request that
provides the auth URL
Returns a tuple of token and the authorisation URL.
"""
client = oauth.Client(self.consumer)
params = {}
params['oauth_callback'] = callback_url or 'oob'
request = oauth.Request(parameters=params)
url = REQUEST_TOKEN_URL
resp, content = client.request(url, "POST", request.to_postdata())
if resp.get('status') == '200':
token = oauth.Token.from_string(content)
yql_logger.debug("token: %s", token)
data = dict(parse_qsl(content))
yql_logger.debug("data: %s", data)
return token, data['xoauth_request_auth_url']
else:
raise YQLError, (resp, content, url)
def test_set_signature_method(self):
consumer = oauth.Consumer('key', 'secret')
client = oauth.Client(consumer)
class Blah:
pass
try:
client.set_signature_method(Blah())
self.fail("Client.set_signature_method() accepted invalid method.")
except ValueError:
pass
m = oauth.SignatureMethod_HMAC_SHA1()
client.set_signature_method(m)
self.assertEqual(m, client.method)
def test_init(self):
class Blah():
pass
try:
client = oauth.Client(Blah())
self.fail("Client.__init__() accepted invalid Consumer.")
except ValueError:
pass
consumer = oauth.Consumer('token', 'secret')
try:
client = oauth.Client(consumer, Blah())
self.fail("Client.__init__() accepted invalid Token.")
except ValueError:
pass
def make_request(self, url, method="GET", body="", headers=None):
"""Makes a request to the TradeKing API."""
consumer = Consumer(key=TRADEKING_CONSUMER_KEY,
secret=TRADEKING_CONSUMER_SECRET)
token = Token(key=TRADEKING_ACCESS_TOKEN,
secret=TRADEKING_ACCESS_TOKEN_SECRET)
client = Client(consumer, token)
self.logs.debug("TradeKing request: %s %s %s %s" %
(url, method, body, headers))
response, content = client.request(url, method=method, body=body,
headers=headers)
self.logs.debug("TradeKing response: %s %s" % (response, content))
try:
return loads(content)
except ValueError:
self.logs.error("Failed to decode JSON response: %s" % content)
return None
def get_user_information(self):
token = oauth2.Token(self.oauth_token, self.oauth_token_secret)
token.set_verifier(self.oauth_verifier)
client = oauth2.Client(self.consumer, token)
url = 'https://api.twitter.com/oauth/access_token'
resp, content = client.request(url, 'POST')
if resp.status != 200:
raise Error('{} from Twitter'.format(resp.status))
provider_user = dict(parse_qsl(content.decode('utf-8')))
if 'user_id' not in provider_user:
raise Error('No user_id from Twitter')
self.status = 200
self.user_id = provider_user.get('user_id')
self.user_name = provider_user.get('screen_name', None)
return (self.user_id, self.user_name,)
def get_request_token(self):
"""
Function for generating initial request token for three-legged oauth
Takes no input, returns a request_token dict with two keys, oauth_token and oauth_token_secret
"""
callback_url = self.request.build_absolute_uri(reverse('bot-authorize', args=(self.object.pk, )))
consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET)
client = oauth.Client(consumer)
resp, content = client.request(self.REQUEST_TOKEN_URL, "POST",
body=urllib.parse.urlencode({'oauth_callback': callback_url}))
if resp['status'] != '200':
raise Exception("Invalid response %s." % resp['status'])
request_token = dict(urllib.parse.parse_qsl(content))
# urllib returns bytes, which will need to be decoded using the string.decode() method before use
request_token = {key.decode(): value.decode() for key, value in request_token.items()}
# Return the token dict containing token and secret
return request_token
def get_access_token(self, oauth_token, oauth_token_secret, oauth_verifier):
"""
Func for final leg of three-legged oauth,
takes token and secret returned in oauth_callback GET dict and
exchanges them for the permanent access token and secret
returns an access_token dict with two keys, oauth_token and oauth_token_secret
"""
token = oauth.Token(oauth_token, oauth_token_secret)
token.set_verifier(oauth_verifier)
consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET)
client = oauth.Client(consumer, token)
# Now we fire the request at access token url instead of request token
resp, content = client.request(self.ACCESS_TOKEN_URL, "POST")
if resp['status'] != '200':
raise Exception("Invalid response %s." % resp['status'])
access_token = dict(urllib.parse.parse_qsl(content))
# urllib returns bytes, which will need to be decoded using the string.decode() method before use
access_token = {key.decode(): value.decode() for key, value in access_token.items()}
# Return the token dict containing token and secret
return access_token
def test_set_signature_method(self):
consumer = oauth.Consumer('key', 'secret')
client = oauth.Client(consumer)
class Blah:
pass
try:
client.set_signature_method(Blah())
self.fail("Client.set_signature_method() accepted invalid method.")
except ValueError:
pass
m = oauth.SignatureMethod_HMAC_SHA1()
client.set_signature_method(m)
self.assertEqual(m, client.method)
def test_init(self):
class Blah():
pass
try:
client = oauth.Client(Blah())
self.fail("Client.__init__() accepted invalid Consumer.")
except ValueError:
pass
consumer = oauth.Consumer('token', 'secret')
try:
client = oauth.Client(consumer, Blah())
self.fail("Client.__init__() accepted invalid Token.")
except ValueError:
pass
def get(self, url, params):
"""
Issues a GET request against the API, properly formatting the params
:param url: a string, the url you are requesting
:param params: a dict, the key-value of all the paramaters needed
in the request
:returns: a dict parsed of the JSON response
"""
url = self.host + url
if params:
url = url + "?" + urllib.urlencode(params)
client = oauth.Client(self.consumer, self.token, proxy_info=self.proxy_info)
client.disable_ssl_certificate_validation = True
try:
client.follow_redirects = False
resp, content = client.request(url, method="GET", redirections=False)
except RedirectLimit, e:
resp, content = e.args
return self.json_parse(content)
def post(self, url, params={}, files=[]):
"""
Issues a POST request against the API, allows for multipart data uploads
:param url: a string, the url you are requesting
:param params: a dict, the key-value of all the parameters needed
in the request
:param files: a list, the list of tuples of files
:returns: a dict parsed of the JSON response
"""
url = self.host + url
try:
if files:
return self.post_multipart(url, params, files)
else:
client = oauth.Client(self.consumer, self.token, proxy_info=self.proxy_info)
client.disable_ssl_certificate_validation = True
resp, content = client.request(url, method="POST", body=urllib.urlencode(params))
return self.json_parse(content)
except urllib2.HTTPError, e:
return self.json_parse(e.read())
def twitter_oauth2(oauth_token, oauth_verifier):
auth_tokens = db.OAuthToken.query(db.OAuthToken.temp_oauth_token == oauth_token, db.OAuthToken.application == db.APP_TWITTER).fetch(1)
if not auth_tokens:
return None
auth_token = auth_tokens[0]
# Step 3: Once the consumer has redirected the user back to the oauth_callback
# URL you can request the access token the user has approved. You use the
# request token to sign this request. After this is done you throw away the
# request token and use the access token returned. You should store this
# access token somewhere safe, like a database, for future use.
token = oauth.Token(oauth_token, auth_token.temp_oauth_token_secret)
token.set_verifier(oauth_verifier)
consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret)
client = oauth.Client(consumer, token)
resp, content = client.request(access_token_url, "POST")
access_token = dict(urlparse.parse_qsl(content))
auth_token.oauth_token = access_token['oauth_token']
auth_token.oauth_token_secret = access_token['oauth_token_secret']
auth_token.valid_token = True
auth_token.time_between_posts = 5 * 60 # every 5 minutes
auth_token.put()
return auth_token
def getAuthorizeURL(self):
client = oauth.Client(self.consumer)
#Get the request token
resp, content = client.request(Splitwise.REQUEST_TOKEN_URL, "POST")
if Splitwise.isDebug():
print(resp, content)
#Check if the response is correct
if resp['status'] != '200':
raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status'])
request_token = dict(parse_qsl(content.decode("utf-8")))
return "%s?oauth_token=%s" % (Splitwise.AUTHORIZE_URL, request_token['oauth_token']), request_token['oauth_token_secret']
def getAccessToken(self,oauth_token,oauth_token_secret,oauth_verifier):
token = oauth.Token(oauth_token,oauth_token_secret)
token.set_verifier(oauth_verifier)
client = oauth.Client(self.consumer, token)
resp, content = client.request(Splitwise.ACCESS_TOKEN_URL, "POST")
if Splitwise.isDebug():
print(resp, content)
#Check if the response is correct
if resp['status'] != '200':
raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status'])
access_token = dict(parse_qsl(content.decode("utf-8")))
return access_token
def get_tweets(twitter_handle):
CONSUMER_KEY = 'YurZ2aWgVEYVhE5c60XyWRGG0'
CONSUMER_SECRET = 'TIaSl3xnVTNUzHrwz84lOycTYkwe9l1NocwkB4hGbd2ngMufn6'
ACCESS_KEY = '564268368-tRdDLN3O9EKzlaY28NzHCgNsYJX59YRngv3qjjJh'
ACCESS_SECRET = 'VUjRU2ftlmnUdqDtppMiV6LLqT83ZbKDDUjcpWtrT1PG4'
consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET)
access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET)
client = oauth.Client(consumer, access_token)
endpoint = "https://api.twitter.com/1.1/search/tweets.json?q=from%3A" + twitter_handle + "&result_type=all&count=100"
response, data = client.request(endpoint)
tweet_data = json.loads(data.decode('utf-8'))
tweets = []
for tweet in tweet_data['statuses']:
tweets.append(re.sub(r'https:\/\/t\.co\/.{10}', '', tweet['text'] + ' \n'))
return tweets
#example
# print(get_tweets("Cmdr_Hadfield"))
def accessToken(self):
"""Return the access token generated by the authenticating server.
If token is already in the session that one will be used.
Otherwise the token is fetched from the auth server.
"""
if self.session.access_token:
# return the token (TODO: does it expire?)
return self.session.access_token
if self.session.request_token:
# Exchange the request token with an authorization token.
token = self.session.request_token
self.session.request_token = None
# Build an authorized client
# OAuth1.0a put the verifier!
token.set_verifier(self.request.vars.oauth_verifier)
client = oauth.Client(self.consumer, token)
resp, content = client.request(self.access_token_url, "POST")
if str(resp['status']) != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
self.session.access_token = oauth.Token.from_string(content)
return self.session.access_token
self.session.access_token = None
return None
def __oauth_login(self, next):
'''This method redirects the user to the authenticating form
on authentication server if the authentication code
and the authentication token are not available to the
application yet.
Once the authentication code has been received this method is
called to set the access token into the session by calling
accessToken()
'''
if not self.accessToken():
# setup the client
client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
# Get a request token.
# oauth_callback *is REQUIRED* for OAuth1.0a
# putting it in the body seems to work.
callback_url = self.__redirect_uri(next)
data = urlencode(dict(oauth_callback=callback_url))
resp, content = client.request(self.token_url, "POST", body=data)
if resp['status'] != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
# Store the request token in session.
request_token = self.session.request_token = oauth.Token.from_string(content)
# Redirect the user to the authentication URL and pass the callback url.
data = urlencode(dict(oauth_token=request_token.key,
oauth_callback=callback_url))
auth_request_url = self.auth_url + '?' + data
HTTP = self.globals['HTTP']
raise HTTP(302,
"You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
Location=auth_request_url)
return None
def _post_patched_request(lti_key, secret, body, url, method, content_type): # pylint: disable=too-many-arguments
"""
Authorization header needs to be capitalized for some LTI clients
this function ensures that header is capitalized
:param body: body of the call
:param client: OAuth Client
:param url: outcome url
:return: response
"""
consumer = oauth2.Consumer(key=lti_key, secret=secret)
client = oauth2.Client(consumer)
import httplib2
http = httplib2.Http
# pylint: disable=protected-access
normalize = http._normalize_headers
def my_normalize(self, headers):
""" This function patches Authorization header """
ret = normalize(self, headers)
if 'authorization' in ret:
ret['Authorization'] = ret.pop('authorization')
return ret
http._normalize_headers = my_normalize
monkey_patch_function = normalize
response, content = client.request(
url,
method,
body=body.encode("utf8"),
headers={'Content-Type': content_type})
http = httplib2.Http
# pylint: disable=protected-access
http._normalize_headers = monkey_patch_function
return response, content
def test_init_passes_kwargs_to_httplib2(self):
class Blah():
pass
consumer = oauth.Consumer('token', 'secret')
# httplib2 options
client = oauth.Client(consumer, None, cache='.cache', timeout=3, disable_ssl_certificate_validation=True)
self.assertNotEqual(client.cache, None)
self.assertEqual(client.timeout, 3)
def test_access_token_get(self):
"""Test getting an access token via GET."""
client = oauth.Client(self.consumer, None)
resp, content = client.request(self._uri('request_token'), "GET")
self.assertEqual(int(resp['status']), 200)
def test_access_token_post(self):
"""Test getting an access token via POST."""
client = oauth.Client(self.consumer, None)
resp, content = client.request(self._uri('request_token'), "POST")
self.assertEqual(int(resp['status']), 200)
res = dict(parse_qsl(content))
self.assertTrue(b'oauth_token' in res)
self.assertTrue(b'oauth_token_secret' in res)
def test_multipart_post_does_not_alter_body(self, mockHttpRequest):
random_result = random.randint(1,100)
data = {
'rand-%d'%random.randint(1,100):random.randint(1,100),
}
content_type, body = self.create_simple_multipart_data(data)
client = oauth.Client(self.consumer, None)
uri = self._uri('two_legged')
def mockrequest(cl, ur, **kw):
self.assertTrue(cl is client)
self.assertTrue(ur is uri)
self.assertEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers']))
self.assertEqual(kw['body'], body)
self.assertEqual(kw['connection_type'], None)
self.assertEqual(kw['method'], 'POST')
self.assertEqual(kw['redirections'],
httplib2.DEFAULT_MAX_REDIRECTS)
self.assertTrue(isinstance(kw['headers'], dict))
return random_result
mockHttpRequest.side_effect = mockrequest
result = client.request(uri, 'POST',
headers={'Content-Type':content_type},
body=body)
self.assertEqual(result, random_result)
def test_url_with_query_string(self, mockHttpRequest):
uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
client = oauth.Client(self.consumer, None)
random_result = random.randint(1,100)
def mockrequest(cl, ur, **kw):
self.assertTrue(cl is client)
self.assertEqual(frozenset(kw.keys()),
frozenset(['method', 'body', 'redirections',
'connection_type', 'headers']))
self.assertEqual(kw['body'], b'')
self.assertEqual(kw['connection_type'], None)
self.assertEqual(kw['method'], 'GET')
self.assertEqual(kw['redirections'],
httplib2.DEFAULT_MAX_REDIRECTS)
self.assertTrue(isinstance(kw['headers'], dict))
req = oauth.Request.from_consumer_and_token(self.consumer, None,
http_method='GET', http_url=uri, parameters={})
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
self.consumer, None)
expected = parse_qsl(
urlparse(req.to_url()).query)
actual = parse_qsl(urlparse(ur).query)
self.assertEqual(len(expected), len(actual))
actual = dict(actual)
for key, value in expected:
if key not in ('oauth_signature',
'oauth_nonce', 'oauth_timestamp'):
self.assertEqual(actual[key], value)
return random_result
mockHttpRequest.side_effect = mockrequest
client.request(uri, 'GET')
def test_multiple_values_for_a_key(self, mockReqConstructor, mockHttpRequest):
client = oauth.Client(self.consumer, None)
request = oauth.Request("GET", "http://example.com/fetch.php", parameters={'multi': ['1', '2']})
mockReqConstructor.return_value = request
client.request('http://whatever', 'POST', body='multi=1&multi=2')
self.assertEqual(mockReqConstructor.call_count, 1)
self.assertEqual(mockReqConstructor.call_args[1]['parameters'], {'multi': ['1', '2']})
self.assertTrue('multi=1' in mockHttpRequest.call_args[1]['body'])
self.assertTrue('multi=2' in mockHttpRequest.call_args[1]['body'])
def accessToken(self):
"""Return the access token generated by the authenticating server.
If token is already in the session that one will be used.
Otherwise the token is fetched from the auth server.
"""
if self.session.access_token:
# return the token (TODO: does it expire?)
return self.session.access_token
if self.session.request_token:
# Exchange the request token with an authorization token.
token = self.session.request_token
self.session.request_token = None
# Build an authorized client
# OAuth1.0a put the verifier!
token.set_verifier(self.request.vars.oauth_verifier)
client = oauth.Client(self.consumer, token)
resp, content = client.request(self.access_token_url, "POST")
if str(resp['status']) != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
self.session.access_token = oauth.Token.from_string(content)
return self.session.access_token
self.session.access_token = None
return None
def __oauth_login(self, next):
'''This method redirects the user to the authenticating form
on authentication server if the authentication code
and the authentication token are not available to the
application yet.
Once the authentication code has been received this method is
called to set the access token into the session by calling
accessToken()
'''
if not self.accessToken():
# setup the client
client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
# Get a request token.
# oauth_callback *is REQUIRED* for OAuth1.0a
# putting it in the body seems to work.
callback_url = self.__redirect_uri(next)
data = urlencode(dict(oauth_callback=callback_url))
resp, content = client.request(self.token_url, "POST", body=data)
if resp['status'] != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
# Store the request token in session.
request_token = self.session.request_token = oauth.Token.from_string(content)
# Redirect the user to the authentication URL and pass the callback url.
data = urlencode(dict(oauth_token=request_token.key,
oauth_callback=callback_url))
auth_request_url = self.auth_url + '?' + data
HTTP = self.globals['HTTP']
raise HTTP(302,
"You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
Location=auth_request_url)
return None
def accessToken(self):
"""Return the access token generated by the authenticating server.
If token is already in the session that one will be used.
Otherwise the token is fetched from the auth server.
"""
if self.session.access_token:
# return the token (TODO: does it expire?)
return self.session.access_token
if self.session.request_token:
# Exchange the request token with an authorization token.
token = self.session.request_token
self.session.request_token = None
# Build an authorized client
# OAuth1.0a put the verifier!
token.set_verifier(self.request.vars.oauth_verifier)
client = oauth.Client(self.consumer, token)
resp, content = client.request(self.access_token_url, "POST")
if str(resp['status']) != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
self.session.access_token = oauth.Token.from_string(content)
return self.session.access_token
self.session.access_token = None
return None
def __oauth_login(self, next):
'''This method redirects the user to the authenticating form
on authentication server if the authentication code
and the authentication token are not available to the
application yet.
Once the authentication code has been received this method is
called to set the access token into the session by calling
accessToken()
'''
if not self.accessToken():
# setup the client
client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
# Get a request token.
# oauth_callback *is REQUIRED* for OAuth1.0a
# putting it in the body seems to work.
callback_url = self.__redirect_uri(next)
data = urlencode(dict(oauth_callback=callback_url))
resp, content = client.request(self.token_url, "POST", body=data)
if resp['status'] != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
# Store the request token in session.
request_token = self.session.request_token = oauth.Token.from_string(content)
# Redirect the user to the authentication URL and pass the callback url.
data = urlencode(dict(oauth_token=request_token.key,
oauth_callback=callback_url))
auth_request_url = self.auth_url + '?' + data
HTTP = self.globals['HTTP']
raise HTTP(302,
"You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
Location=auth_request_url)
return None
def get_initial_oauth_tokens(self):
url = 'https://api.twitter.com/oauth/request_token'
client = oauth2.Client(self.consumer)
resp, content = client.request(url, 'GET')
if resp.status != 200:
raise Error('{} from Twitter'.format(resp.status))
oauth_values = dict(parse_qsl(content.decode('utf-8')))
self.oauth_token = oauth_values.get('oauth_token')
self.oauth_token_secret = oauth_values.get('oauth_token_secret')
if not self.oauth_token or not self.oauth_token_secret:
raise Error('No oauth_token or oauth_token_secret from Twitter')
return (self.oauth_token, self.oauth_token_secret,)