def accessToken(self):
"""Return the access token generated by the authenticating server.
If token is already in the session that one will be used.
Otherwise the token is fetched from the auth server.
"""
if self.session.access_token:
# return the token (TODO: does it expire?)
return self.session.access_token
if self.session.request_token:
# Exchange the request token with an authorization token.
token = self.session.request_token
self.session.request_token = None
# Build an authorized client
# OAuth1.0a put the verifier!
token.set_verifier(self.request.vars.oauth_verifier)
client = oauth.Client(self.consumer, token)
resp, content = client.request(self.access_token_url, "POST")
if str(resp['status']) != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
self.session.access_token = oauth.Token.from_string(content)
return self.session.access_token
self.session.access_token = None
return None
python类Client()的实例源码
def __oauth_login(self, next):
'''This method redirects the user to the authenticating form
on authentication server if the authentication code
and the authentication token are not available to the
application yet.
Once the authentication code has been received this method is
called to set the access token into the session by calling
accessToken()
'''
if not self.accessToken():
# setup the client
client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
# Get a request token.
# oauth_callback *is REQUIRED* for OAuth1.0a
# putting it in the body seems to work.
callback_url = self.__redirect_uri(next)
data = urlencode(dict(oauth_callback=callback_url))
resp, content = client.request(self.token_url, "POST", body=data)
if resp['status'] != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
# Store the request token in session.
request_token = self.session.request_token = oauth.Token.from_string(content)
# Redirect the user to the authentication URL and pass the callback url.
data = urlencode(dict(oauth_token=request_token.key,
oauth_callback=callback_url))
auth_request_url = self.auth_url + '?' + data
HTTP = self.globals['HTTP']
raise HTTP(302,
"You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
Location=auth_request_url)
return None
def test_init_passes_kwargs_to_httplib2(self):
class Blah():
pass
consumer = oauth.Consumer('token', 'secret')
# httplib2 options
client = oauth.Client(consumer, None, cache='.cache', timeout=3, disable_ssl_certificate_validation=True)
self.assertNotEqual(client.cache, None)
self.assertEqual(client.timeout, 3)
def test_access_token_get(self):
"""Test getting an access token via GET."""
client = oauth.Client(self.consumer, None)
resp, content = client.request(self._uri('request_token'), "GET")
self.assertEqual(int(resp['status']), 200)
def test_access_token_post(self):
"""Test getting an access token via POST."""
client = oauth.Client(self.consumer, None)
resp, content = client.request(self._uri('request_token'), "POST")
self.assertEqual(int(resp['status']), 200)
res = dict(parse_qsl(content))
self.assertTrue(b'oauth_token' in res)
self.assertTrue(b'oauth_token_secret' in res)
def test_multipart_post_does_not_alter_body(self, mockHttpRequest):
random_result = random.randint(1,100)
data = {
'rand-%d'%random.randint(1,100):random.randint(1,100),
}
content_type, body = self.create_simple_multipart_data(data)
client = oauth.Client(self.consumer, None)
uri = self._uri('two_legged')
def mockrequest(cl, ur, **kw):
self.assertTrue(cl is client)
self.assertTrue(ur is uri)
self.assertEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers']))
self.assertEqual(kw['body'], body)
self.assertEqual(kw['connection_type'], None)
self.assertEqual(kw['method'], 'POST')
self.assertEqual(kw['redirections'],
httplib2.DEFAULT_MAX_REDIRECTS)
self.assertTrue(isinstance(kw['headers'], dict))
return random_result
mockHttpRequest.side_effect = mockrequest
result = client.request(uri, 'POST',
headers={'Content-Type':content_type},
body=body)
self.assertEqual(result, random_result)
def test_url_with_query_string(self, mockHttpRequest):
uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
client = oauth.Client(self.consumer, None)
random_result = random.randint(1,100)
def mockrequest(cl, ur, **kw):
self.assertTrue(cl is client)
self.assertEqual(frozenset(kw.keys()),
frozenset(['method', 'body', 'redirections',
'connection_type', 'headers']))
self.assertEqual(kw['body'], b'')
self.assertEqual(kw['connection_type'], None)
self.assertEqual(kw['method'], 'GET')
self.assertEqual(kw['redirections'],
httplib2.DEFAULT_MAX_REDIRECTS)
self.assertTrue(isinstance(kw['headers'], dict))
req = oauth.Request.from_consumer_and_token(self.consumer, None,
http_method='GET', http_url=uri, parameters={})
req.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
self.consumer, None)
expected = parse_qsl(
urlparse(req.to_url()).query)
actual = parse_qsl(urlparse(ur).query)
self.assertEqual(len(expected), len(actual))
actual = dict(actual)
for key, value in expected:
if key not in ('oauth_signature',
'oauth_nonce', 'oauth_timestamp'):
self.assertEqual(actual[key], value)
return random_result
mockHttpRequest.side_effect = mockrequest
client.request(uri, 'GET')
def test_multiple_values_for_a_key(self, mockReqConstructor, mockHttpRequest):
client = oauth.Client(self.consumer, None)
request = oauth.Request("GET", "http://example.com/fetch.php", parameters={'multi': ['1', '2']})
mockReqConstructor.return_value = request
client.request('http://whatever', 'POST', body='multi=1&multi=2')
self.assertEqual(mockReqConstructor.call_count, 1)
self.assertEqual(mockReqConstructor.call_args[1]['parameters'], {'multi': ['1', '2']})
self.assertTrue('multi=1' in mockHttpRequest.call_args[1]['body'])
self.assertTrue('multi=2' in mockHttpRequest.call_args[1]['body'])
oauth10a_account.py 文件源码
项目:rekall-agent-server
作者: rekall-innovations
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def accessToken(self):
"""Return the access token generated by the authenticating server.
If token is already in the session that one will be used.
Otherwise the token is fetched from the auth server.
"""
if self.session.access_token:
# return the token (TODO: does it expire?)
return self.session.access_token
if self.session.request_token:
# Exchange the request token with an authorization token.
token = self.session.request_token
self.session.request_token = None
# Build an authorized client
# OAuth1.0a put the verifier!
token.set_verifier(self.request.vars.oauth_verifier)
client = oauth.Client(self.consumer, token)
resp, content = client.request(self.access_token_url, "POST")
if str(resp['status']) != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
self.session.access_token = oauth.Token.from_string(content)
return self.session.access_token
self.session.access_token = None
return None
oauth10a_account.py 文件源码
项目:rekall-agent-server
作者: rekall-innovations
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def __oauth_login(self, next):
'''This method redirects the user to the authenticating form
on authentication server if the authentication code
and the authentication token are not available to the
application yet.
Once the authentication code has been received this method is
called to set the access token into the session by calling
accessToken()
'''
if not self.accessToken():
# setup the client
client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
# Get a request token.
# oauth_callback *is REQUIRED* for OAuth1.0a
# putting it in the body seems to work.
callback_url = self.__redirect_uri(next)
data = urlencode(dict(oauth_callback=callback_url))
resp, content = client.request(self.token_url, "POST", body=data)
if resp['status'] != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
# Store the request token in session.
request_token = self.session.request_token = oauth.Token.from_string(content)
# Redirect the user to the authentication URL and pass the callback url.
data = urlencode(dict(oauth_token=request_token.key,
oauth_callback=callback_url))
auth_request_url = self.auth_url + '?' + data
HTTP = self.globals['HTTP']
raise HTTP(302,
"You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
Location=auth_request_url)
return None
def __init__( self,
consumer_key,
consumer_secret,
user_token,
user_secret,
logger):
logger.info("OAuth:\nConsumer Key:{}\nConsumer Secret:{}\nUser Token:{}\nUser Secret:{}".format(consumer_key,consumer_secret,user_token,user_secret))
consumer = oauth.Consumer(consumer_key,consumer_secret)
client = oauth.Client(consumer)
access_token = oauth.Token(key=user_token,
secret=user_secret)
client = oauth.Client(consumer,access_token,timeout=MediaConnection.timeout)
self.client = client
super().__init__(logger)
def accessToken(self):
"""Return the access token generated by the authenticating server.
If token is already in the session that one will be used.
Otherwise the token is fetched from the auth server.
"""
if self.session.access_token:
# return the token (TODO: does it expire?)
return self.session.access_token
if self.session.request_token:
# Exchange the request token with an authorization token.
token = self.session.request_token
self.session.request_token = None
# Build an authorized client
# OAuth1.0a put the verifier!
token.set_verifier(self.request.vars.oauth_verifier)
client = oauth.Client(self.consumer, token)
resp, content = client.request(self.access_token_url, "POST")
if str(resp['status']) != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
self.session.access_token = oauth.Token.from_string(content)
return self.session.access_token
self.session.access_token = None
return None
def __oauth_login(self, next):
'''This method redirects the user to the authenticating form
on authentication server if the authentication code
and the authentication token are not available to the
application yet.
Once the authentication code has been received this method is
called to set the access token into the session by calling
accessToken()
'''
if not self.accessToken():
# setup the client
client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
# Get a request token.
# oauth_callback *is REQUIRED* for OAuth1.0a
# putting it in the body seems to work.
callback_url = self.__redirect_uri(next)
data = urlencode(dict(oauth_callback=callback_url))
resp, content = client.request(self.token_url, "POST", body=data)
if resp['status'] != '200':
self.session.request_token = None
self.globals['redirect'](self.globals[
'URL'](f='user', args='logout'))
# Store the request token in session.
request_token = self.session.request_token = oauth.Token.from_string(content)
# Redirect the user to the authentication URL and pass the callback url.
data = urlencode(dict(oauth_token=request_token.key,
oauth_callback=callback_url))
auth_request_url = self.auth_url + '?' + data
HTTP = self.globals['HTTP']
raise HTTP(302,
"You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
Location=auth_request_url)
return None
def twitter_oauth1(user_id, token_nickname, country_filter):
consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret)
client = oauth.Client(consumer)
# Step 1: Get a request token. This is a temporary token that is used for
# having the user authorize an access token and to sign the request to obtain
# said access token.
resp, content = client.request(request_token_url, "GET")
if resp['status'] != '200':
raise Exception("Invalid response %s." % resp['status'])
request_token = dict(urlparse.parse_qsl(content))
auth_tokens = db.OAuthToken.query(
db.OAuthToken.user_id == user_id, db.OAuthToken.token_nickname == token_nickname, db.OAuthToken.application == db.APP_TWITTER
).fetch(1)
if auth_tokens:
auth_token = auth_tokens[0]
else:
auth_token = db.OAuthToken()
auth_token.user_id = user_id
auth_token.token_nickname = token_nickname
auth_token.application = db.APP_TWITTER
auth_token.temp_oauth_token = request_token['oauth_token']
auth_token.temp_oauth_token_secret = request_token['oauth_token_secret']
if country_filter:
auth_token.country_filters += country_filter.upper()
auth_token.put()
# Step 2: Redirect to the provider. Since this is a CLI script we do not
# redirect. In a web application you would redirect the user to the URL
# below.
return "%s?oauth_token=%s" % (authorize_url, request_token['oauth_token'])
# user comes to:
# /sign-in-with-twitter/?
# oauth_token=NPcudxy0yU5T3tBzho7iCotZ3cnetKwcTIRlX0iwRl0&
# oauth_verifier=uw7NjWHT6OJ1MpJOXsHfNxoAhPKpgI8BlYDhxEjIBY
def request_2legged(url, http_method="GET"):
client = oauth.Client(_consumer())
response, content = client.request(
url,
headers = {"Content-Type":"application/x-www-form-urlencoded"},
body="country=%s" % api_settings.country,
method = http_method
)
return response, content
def request_access_token(token):
client = oauth.Client(_consumer(), token=token)
response, content = client.request(
ACCESS_TOKEN_URL,
headers={"Content-Type":"application/x-www-form-urlencoded"}
)
return _token_from_response_content(content)
def request_3legged(url, access_token, http_method="GET", body=''):
''' Once you have an access_token authorized by a customer,
execute a request on their behalf
'''
client = oauth.Client(_consumer(), token=access_token)
response = client.request(
url,
headers={"Content-Type":"application/x-www-form-urlencoded"},
method=http_method,
body=body
)
return response
def request_2legged(url, http_method="GET"):
client = oauth.Client(_consumer())
response, content = client.request(
url,
headers = {"Content-Type":"application/x-www-form-urlencoded"},
body="country=%s" % api_settings.country,
method = http_method
)
return response, content
def request_access_token(token):
client = oauth.Client(_consumer(), token=token)
response, content = client.request(
ACCESS_TOKEN_URL,
headers={"Content-Type":"application/x-www-form-urlencoded"}
)
return _token_from_response_content(content)
def oauthReq(self, url, http_method="GET", post_body=None, http_headers=None):
config = self.parseConfig()
consumer = oauth.Consumer(key=config.get('consumer_key'), secret=config.get('consumer_secret'))
token = oauth.Token(key=config.get('access_token'), secret=config.get('access_token_secret'))
client = oauth.Client(consumer, token)
resp, content = client.request(
url,
method=http_method,
body=post_body or '',
headers=http_headers
)
return content