def get_authorization_url(self, callback_uri):
session = OAuth1Session(
settings.OAUTH_CONSUMER_KEY,
client_secret=settings.OAUTH_CONSUMER_SECRET,
callback_uri=callback_uri,
)
try:
url = settings.API_HOST + settings.OAUTH_TOKEN_PATH
response = session.fetch_request_token(url)
except (ValueError, TokenRequestDenied, ConnectionError) as err:
raise AuthenticatorError(err)
else:
self.token = response.get('oauth_token')
self.secret = response.get('oauth_token_secret')
url = settings.API_HOST + settings.OAUTH_AUTHORIZATION_PATH
authorization_url = session.authorization_url(url)
LOGGER.log(logging.INFO, 'Initial token {}, secret {}'.format(
self.token, self.secret))
return authorization_url
python类OAuth1Session()的实例源码
def set_access_token(self, authorization_url):
session = OAuth1Session(
settings.OAUTH_CONSUMER_KEY,
settings.OAUTH_CONSUMER_SECRET,
resource_owner_key=self.token,
resource_owner_secret=self.secret,
)
session.parse_authorization_response(authorization_url)
url = settings.API_HOST + settings.OAUTH_ACCESS_TOKEN_PATH
try:
response = session.fetch_access_token(url)
except (TokenRequestDenied, ConnectionError) as err:
raise AuthenticatorError(err)
else:
self.token = response.get('oauth_token')
self.secret = response.get('oauth_token_secret')
LOGGER.log(logging.INFO, 'Updated token {}, secret {}'.format(
self.token, self.secret))
def __init__(self, consumer_key, consumer_secret, callback=None):
if type(consumer_key) == six.text_type:
consumer_key = consumer_key.encode('ascii')
if type(consumer_secret) == six.text_type:
consumer_secret = consumer_secret.encode('ascii')
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = None
self.access_token_secret = None
self.callback = callback
self.username = None
self.oauth = OAuth1Session(consumer_key,
client_secret=consumer_secret,
callback_uri=self.callback)
def get_access_token(self, verifier=None):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try:
url = self._get_oauth_url('access_token')
self.oauth = OAuth1Session(self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.request_token['oauth_token'],
resource_owner_secret=self.request_token['oauth_token_secret'],
verifier=verifier, callback_uri=self.callback)
resp = self.oauth.fetch_access_token(url)
self.access_token = resp['oauth_token']
self.access_token_secret = resp['oauth_token_secret']
return self.access_token, self.access_token_secret
except Exception as e:
raise TweepError(e)
def get_authentication_url(self):
oauth = OAuth1Session(
self.client_id,
client_secret=self.client_secret,
)
token = oauth.fetch_request_token(self.request_token_url)
cache.set(
'oa-token-%s' % token['oauth_token'],
token,
timeout=3600)
self._request.session['oa_token'] = token['oauth_token']
authorization_url = oauth.authorization_url(
self.authorization_base_url,
)
return authorization_url
def __init__(self, consumer_key, consumer_secret, callback=None):
if type(consumer_key) == six.text_type:
consumer_key = consumer_key.encode('ascii')
if type(consumer_secret) == six.text_type:
consumer_secret = consumer_secret.encode('ascii')
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = None
self.access_token_secret = None
self.callback = callback
self.username = None
self.oauth = OAuth1Session(consumer_key,
client_secret=consumer_secret,
callback_uri=self.callback)
def get_access_token(self, verifier=None):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try:
url = self._get_oauth_url('access_token')
self.oauth = OAuth1Session(self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.request_token['oauth_token'],
resource_owner_secret=self.request_token['oauth_token_secret'],
verifier=verifier, callback_uri=self.callback)
resp = self.oauth.fetch_access_token(url)
self.access_token = resp['oauth_token']
self.access_token_secret = resp['oauth_token_secret']
return self.access_token, self.access_token_secret
except Exception as e:
raise TweepError(e)
def __init__(self, consumer_key, consumer_secret, callback=None):
if type(consumer_key) == six.text_type:
consumer_key = consumer_key.encode('ascii')
if type(consumer_secret) == six.text_type:
consumer_secret = consumer_secret.encode('ascii')
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = None
self.access_token_secret = None
self.callback = callback
self.username = None
self.oauth = OAuth1Session(consumer_key,
client_secret=consumer_secret,
callback_uri=self.callback)
def get_access_token(self, verifier=None):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try:
url = self._get_oauth_url('access_token')
self.oauth = OAuth1Session(self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.request_token['oauth_token'],
resource_owner_secret=self.request_token['oauth_token_secret'],
verifier=verifier, callback_uri=self.callback)
resp = self.oauth.fetch_access_token(url)
self.access_token = resp['oauth_token']
self.access_token_secret = resp['oauth_token_secret']
return self.access_token, self.access_token_secret
except Exception as e:
raise TweepError(e)
def __init__(self, consumer_key, consumer_secret, callback=None):
if type(consumer_key) == six.text_type:
consumer_key = consumer_key.encode('ascii')
if type(consumer_secret) == six.text_type:
consumer_secret = consumer_secret.encode('ascii')
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = None
self.access_token_secret = None
self.callback = callback
self.username = None
self.oauth = OAuth1Session(consumer_key,
client_secret=consumer_secret,
callback_uri=self.callback)
def get_access_token(self, verifier=None):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try:
url = self._get_oauth_url('access_token')
self.oauth = OAuth1Session(self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.request_token['oauth_token'],
resource_owner_secret=self.request_token['oauth_token_secret'],
verifier=verifier, callback_uri=self.callback)
resp = self.oauth.fetch_access_token(url)
self.access_token = resp['oauth_token']
self.access_token_secret = resp['oauth_token_secret']
return self.access_token, self.access_token_secret
except Exception as e:
raise TweepError(e)
def __init__(self, consumer_key, consumer_secret, domain='https://www.schoology.com', three_legged=False,
request_token=None, request_token_secret=None, access_token=None, access_token_secret=None):
self.API_ROOT = 'https://api.schoology.com/v1'
self.DOMAIN_ROOT = domain
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.request_token = request_token
self.request_token_secret = request_token_secret
self.access_token = access_token
self.access_token_secret = access_token_secret
self.oauth = requests_oauthlib.OAuth1Session(self.consumer_key, self.consumer_secret)
self.three_legged = three_legged
def get_auth_token(self):
"""
Retrieves an auth_session_token using DEP server token data prepared as an
OAuth1Session() instance earlier on.
"""
# Retrieve session auth token
get_session = self.dep_prep('session', 'get', authsession=self.oauth)
response = self.oauth.send(get_session)
# Extract the auth session token from the JSON reply
token = response.json()['auth_session_token']
# The token happens to contain the UNIX timestamp of when it was generated
# so we save it for later reference.
timestamp = token[:10]
# Roll a human-readable timestamp as well.
ts_readable = datetime.datetime.fromtimestamp(
int(timestamp)).strftime(
'%Y-%m-%d %H:%M:%S')
print "Token generated at %s" % ts_readable
return token, timestamp
def __init__(self, consumer_key, consumer_secret, callback=None):
if type(consumer_key) == six.text_type:
consumer_key = consumer_key.encode('ascii')
if type(consumer_secret) == six.text_type:
consumer_secret = consumer_secret.encode('ascii')
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = None
self.access_token_secret = None
self.callback = callback
self.username = None
self.oauth = OAuth1Session(consumer_key,
client_secret=consumer_secret,
callback_uri=self.callback)
def get_access_token(self, verifier=None):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try:
url = self._get_oauth_url('access_token')
self.oauth = OAuth1Session(self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.request_token['oauth_token'],
resource_owner_secret=self.request_token['oauth_token_secret'],
verifier=verifier, callback_uri=self.callback)
resp = self.oauth.fetch_access_token(url)
self.access_token = resp['oauth_token']
self.access_token_secret = resp['oauth_token_secret']
return self.access_token, self.access_token_secret
except Exception as e:
raise TweepError(e)
def get_trends(self, local):
"""
Method to get the trending hashtags.
:type local: str
:rtype: list of str
"""
session_string = "https://api.twitter.com/1.1/trends/place.json?id="
local_id = self.get_local_identifier()[local]
session_string += local_id
session = OAuth1Session(ConsumerKey,
ConsumerSecret,
AccessToken,
AccessTokenSecret)
response = session.get(session_string)
if response.__dict__['status_code'] == 200:
local_trends = json.loads(response.text)[0]["trends"]
hashtags = [trend["name"]
for trend in local_trends if trend["name"][0] == '#']
else:
hashtags = []
return hashtags
def authorize(filepath=None):
"""
Create an authorization object for use with the requests
library. Takes the path to a yaml-encoded file containing twitter API keys.
:param filepath:
:type filepath: str
:returns: OAuth1Session
"""
# Try to find the file if no path was given
# find_keyfile returns a generator and .next() gives the first match
try:
filepath = filepath or next(find_keyfile())
except StopIteration:
raise Exception("No Keyfile found - please place keys.yaml with your tokens in the project directory or pass a custom filepath to the authorize() function")
# Load credentials from keyfile
with open(filepath, 'r') as f:
keys = yaml.load(f)
# Create authentication object
auth_object = OAuth1Session(client_key=keys["client_key"],
client_secret=keys["client_secret"],
resource_owner_key=keys["resource_owner_key"],
resource_owner_secret=keys["resource_owner_secret"])
return auth_object
def doTweet(name, title, sturl):
CK = "" # Consumer Key
CS = "" # Consumer Secret
AT = "" # Access Token
AS = "" # Accesss Token Secert
# ????????UR
url = "https://api.twitter.com/1.1/statuses/update.json"
# ??????
params = {"status": datetime.now().strftime("?%Y/%m/%d %H:%M?\n") + name + "?????????????\n" + title + "\n\n" + sturl}
# OAuth??? POST method ???
twitter = OAuth1Session(CK, CS, AT, AS)
req = twitter.post(url, params=params)
# ????????
if req.status_code == 200:
print(datetime.now().strftime("[%Y/%m/%d %H:%M:%S]") + "[SUCCESS] Tweet success")
else:
print(datetime.now().strftime("[%Y/%m/%d %H:%M:%S]") + "[FAIL] Tweet fail Error: %d" % req.status_code)
# ???????tweet??????
def doTweetTime(name, title, sturl, time):
CK = "" # Consumer Key
CS = "" # Consumer Secret
AT = "" # Access Token
AS = "" # Accesss Token Secert
# ????????UR
url = "https://api.twitter.com/1.1/statuses/update.json"
# ??????
params = {"status": datetime.now().strftime("?%Y/%m/%d %H:%M") + " ????" + str(time) + "????\n" + name + "?????\n" + title + "\n\n" + sturl}
# OAuth??? POST method ???
twitter = OAuth1Session(CK, CS, AT, AS)
req = twitter.post(url, params=params)
# ????????
if req.status_code == 200:
print(datetime.now().strftime("[%Y/%m/%d %H:%M:%S]") + "[SUCCESS] Tweet success")
else:
print(datetime.now().strftime("[%Y/%m/%d %H:%M:%S]") + "[FAIL] Tweet fail Error: %d" % req.status_code)
def fetch_access_token(self, verifier, token=None):
"""Step 3: Given the verifier from fitbit, and optionally a token from
step 1 (not necessary if using the same FitbitOAuthClient object) calls
fitbit again and returns an access token object. Extract the needed
information from that and save it to use in future API calls.
"""
if token:
self.resource_owner_key = token.get('oauth_token')
self.resource_owner_secret = token.get('oauth_token_secret')
self.oauth = OAuth1Session(
self.client_key,
client_secret=self.client_secret,
resource_owner_key=self.resource_owner_key,
resource_owner_secret=self.resource_owner_secret,
verifier=verifier)
response = self.oauth.fetch_access_token(self.access_token_url)
self.user_id = response.get('encoded_user_id')
self.resource_owner_key = response.get('oauth_token')
self.resource_owner_secret = response.get('oauth_token_secret')
return response
def test_fetch_request_token(self):
# fetch_request_token needs to make a request and then build a token from the response
fb = Fitbit(**self.client_kwargs)
with mock.patch.object(OAuth1Session, 'fetch_request_token') as frt:
frt.return_value = {
'oauth_callback_confirmed': 'true',
'oauth_token': 'FAKE_OAUTH_TOKEN',
'oauth_token_secret': 'FAKE_OAUTH_TOKEN_SECRET'}
retval = fb.client.fetch_request_token()
self.assertEqual(1, frt.call_count)
# Got the right return value
self.assertEqual('true', retval.get('oauth_callback_confirmed'))
self.assertEqual('FAKE_OAUTH_TOKEN', retval.get('oauth_token'))
self.assertEqual('FAKE_OAUTH_TOKEN_SECRET',
retval.get('oauth_token_secret'))
def test_fetch_access_token(self):
kwargs = self.client_kwargs
kwargs['resource_owner_key'] = ''
kwargs['resource_owner_secret'] = ''
fb = Fitbit(**kwargs)
fake_verifier = "FAKEVERIFIER"
with mock.patch.object(OAuth1Session, 'fetch_access_token') as fat:
fat.return_value = {
'encoded_user_id': 'FAKE_USER_ID',
'oauth_token': 'FAKE_RETURNED_KEY',
'oauth_token_secret': 'FAKE_RETURNED_SECRET'
}
retval = fb.client.fetch_access_token(fake_verifier)
self.assertEqual("FAKE_RETURNED_KEY", retval['oauth_token'])
self.assertEqual("FAKE_RETURNED_SECRET", retval['oauth_token_secret'])
self.assertEqual('FAKE_USER_ID', fb.client.user_id)
def __init__(self, consumer_key, consumer_secret, callback=None):
if type(consumer_key) == six.text_type:
consumer_key = consumer_key.encode('ascii')
if type(consumer_secret) == six.text_type:
consumer_secret = consumer_secret.encode('ascii')
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = None
self.access_token_secret = None
self.callback = callback
self.username = None
self.oauth = OAuth1Session(consumer_key,
client_secret=consumer_secret,
callback_uri=self.callback)
def get_access_token(self, verifier=None):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try:
url = self._get_oauth_url('access_token')
self.oauth = OAuth1Session(self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.request_token['oauth_token'],
resource_owner_secret=self.request_token['oauth_token_secret'],
verifier=verifier, callback_uri=self.callback)
resp = self.oauth.fetch_access_token(url)
self.access_token = resp['oauth_token']
self.access_token_secret = resp['oauth_token_secret']
return self.access_token, self.access_token_secret
except Exception as e:
raise TweepError(e)
def yelp_request(host, path, url_params=None):
"""Format a proper OAuth request to the API"""
url_params = url_params or {}
url = host + path
yelp = OAuth1Session(
CONSUMER_KEY,
client_secret=CONSUMER_SECRET,
resource_owner_key=TOKEN,
resource_owner_secret=TOKEN_SECRET)
print u'Querying Yelp API...'
response = yelp.get(url, params=url_params)
return response.json()
def get_session(self):
session = OAuth1Session(
settings.OAUTH_CONSUMER_KEY,
client_secret=settings.OAUTH_CONSUMER_SECRET,
resource_owner_key=self.token,
resource_owner_secret=self.secret,
)
return session
def __init__(self, client_key, client_secret=None, token=None, token_secret=None,
callback_uri=None, verifier=None, proxies=None, https=True):
super(Fanfou, self).__init__(OAuth1Session(client_key, client_secret=client_secret,
resource_owner_key=token, resource_owner_secret=token_secret,
callback_uri=callback_uri, verifier=verifier),
"https://api.fanfou.com" if https else "http://api.fanfou.com",
objlist=['search', 'blocks', 'users', 'account', 'saved_searches',
'photos', 'trends', 'followers', 'favorites', 'friendships',
'friends', 'statuses', 'direct_messages'],
postfix="json", proxies=proxies)
self.auth._client = HttpsAuth(self.auth.auth)
self.auth.auth = self.auth._client
def get(self, request, *args, **kwargs):
from requests_oauthlib import OAuth1Session
config = SocialConfig.get_solo()
oauth_token = request.GET.get('oauth_token')
oauth_verifier = request.GET.get('oauth_verifier')
oauth_client = OAuth1Session(
client_key=config.twitter_client_id,
client_secret=config.twitter_client_secret,
resource_owner_key=oauth_token,
resource_owner_secret=config.twitter_access_token_secret,
verifier=oauth_verifier
)
try:
answer = oauth_client.fetch_access_token('https://api.twitter.com/oauth/access_token')
except ValueError:
raise Http404
if answer and 'oauth_token' in answer:
SocialConfig.objects.update(twitter_access_token=answer['oauth_token'])
add_message(request, SUCCESS, _('Twitter access_token updated successfully!'))
return redirect('admin:social_networks_socialconfig_change')
else:
return HttpResponse(answer)
def get_user_data(self):
oauth = OAuth1Session(
self.client_id,
client_secret=self.client_secret,
)
oauth_response = oauth.parse_authorization_response(
self._request.build_absolute_uri(self._request.get_full_path()))
verifier = oauth_response.get('oauth_verifier')
resource_owner = cache.get(
'oa-token-%s' % self._request.session.pop('oa_token')
)
oauth = OAuth1Session(
self.client_id,
client_secret=self.client_secret,
resource_owner_key=resource_owner.get('oauth_token'),
resource_owner_secret=resource_owner.get('oauth_token_secret'),
verifier=verifier,
)
oauth_tokens = oauth.fetch_access_token(self.access_token_url)
resource_owner_key = oauth_tokens.get('oauth_token')
resource_owner_secret = oauth_tokens.get('oauth_token_secret')
oauth = OAuth1Session(
self.client_id,
client_secret=self.client_secret,
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret,
)
data = oauth.get(
'https://api.twitter.com/1.1/account/verify_credentials.json'
'?include_email=true',
).json()
return {
'email': data.get('email'),
'full_name': data.get('name'),
}
def _do_oauth(self):
request_token_url = self.host + "/oauth/request_token"
authorization_url = self.host + "/oauth/authorize"
access_token_url = self.host + "/oauth/access_token"
oauth_session = OAuth1Session(self.api_key[self.cur_api_key], client_secret=self.api_secret[self.cur_api_key], callback_uri="paste_this") # TODO: use 'oob'
oauth_session.fetch_request_token(request_token_url)
redirect_url = oauth_session.authorization_url(authorization_url)
print "Flickr needs user authentication"
print "--------------------------------"
print "Visit this site:"
# Flickr permissions:
# read - permission to read private information
# write - permission to add, edit and delete photo metadata (includes 'read')
# delete - permission to delete photos (includes 'write' and 'read')
print redirect_url+"&perms=write"
redirect_response = raw_input('Paste the FULL URL here:')
oauth_session.parse_authorization_response(redirect_response)
oauth_session.fetch_access_token(access_token_url)
with open(self.oauth_file, 'w') as f:
pickle.dump(oauth_session, f)
return oauth_session