def __init__(self, client_key, client_secret,
resource_owner_key, resource_owner_secret):
'''__init_()
'''
self.client_key = client_key
self.client_secret = client_secret
self.resource_owner_key = resource_owner_key
self.resource_owner_secret = resource_owner_secret
self.base_url_prod = r'https://etws.etrade.com'
self.base_url_dev = r'https://etwssandbox.etrade.com'
self.session = OAuth1Session(self.client_key,
self.client_secret,
self.resource_owner_key,
self.resource_owner_secret,
signature_type='AUTH_HEADER')
python类OAuth1Session()的实例源码
def __init__(self, client_key, client_secret,
resource_owner_key, resource_owner_secret):
'''__init__(client_key, client_secret)
param: client_key
type: str
description: etrade client key
param: client_secret
type: str
description: etrade client secret
param: resource_owner_key
type: str
description: OAuth authentication token key
param: resource_owner_secret
type: str
description: OAuth authentication token secret'''
self.client_key = client_key
self.client_secret = client_secret
self.resource_owner_key = resource_owner_key
self.resource_owner_secret = resource_owner_secret
self.base_url_prod = r'https://etws.etrade.com'
self.base_url_dev = r'https://etwssandbox.etrade.com'
self.session = OAuth1Session(self.client_key,
self.client_secret,
self.resource_owner_key,
self.resource_owner_secret,
signature_type='AUTH_HEADER')
def __init__(self, client_key, client_secret,
resource_owner_key, resource_owner_secret):
'''__init__(client_key, client_secret)
param: client_key
type: str
description: etrade client key
param: client_secret
type: str
description: etrade client secret
param: resource_owner_key
type: str
description: OAuth authentication token key
param: resource_owner_secret
type: str
description: OAuth authentication token secret'''
self.client_key = client_key
self.client_secret = client_secret
self.resource_owner_key = resource_owner_key
self.resource_owner_secret = resource_owner_secret
self.renew_access_token_url = r'https://etws.etrade.com/oauth/renew_access_token'
self.revoke_access_token_url = r'https://etws.etrade.com/oauth/revoke_access_token'
self.session = OAuth1Session(self.client_key,
self.client_secret,
self.resource_owner_key,
self.resource_owner_secret,
signature_type='AUTH_HEADER')
def __init__(self, client_key, client_secret,
resource_owner_key, resource_owner_secret):
'''__init__(client_key, client_secret)
param: client_key
type: str
description: etrade client key
param: client_secret
type: str
description: etrade client secret
param: resource_owner_key
type: str
description: OAuth authentication token key
param: resource_owner_secret
type: str
description: OAuth authentication token secret'''
self.client_key = client_key
self.client_secret = client_secret
self.resource_owner_key = resource_owner_key
self.resource_owner_secret = resource_owner_secret
self.base_url_prod = r'https://etws.etrade.com'
self.base_url_dev = r'https://etwssandbox.etrade.com'
self.session = OAuth1Session(self.client_key,
self.client_secret,
self.resource_owner_key,
self.resource_owner_secret,
signature_type='AUTH_HEADER')
def authorize(self):
if self.authorized or not self.three_legged:
return True
access_token_url = self.API_ROOT + '/oauth/access_token'
self.oauth = requests_oauthlib.OAuth1Session(self.consumer_key,
self.consumer_secret,
resource_owner_key=self.request_token,
resource_owner_secret=self.request_token_secret)
try:
oauth_tokens = self._fetch_token(access_token_url, self.oauth)
except TokenRequestDenied:
return False
self.access_token = oauth_tokens.get('oauth_token')
self.access_token_secret = oauth_tokens.get('oauth_token_secret')
return self.access_token is not None
def init_stoken(self, stoken):
"""
Loads the required stoken.json file from disk.
"""
with open(stoken) as data_file:
self.mytoken = json.load(data_file)
# Verify that the stoken data is somewhat sane, i.e. has the required
# keys and their values start with expected prepends.
try:
for k, prefix in (('consumer_secret', 'CS_'),
('access_token', 'AT_'),
('consumer_key', 'CK_'),
('access_secret', 'AS_')):
if not self.mytoken.get(k).startswith(prefix):
print 'Error parsing stoken file: bad value for:\n%s = %s\n' % (k, self.mytoken[k])
sys.exit(-1)
except AttributeError:
print 'Error parsing stoken file: missing key for:\n%s\n' % (k)
sys.exit(-1)
# Set the required OAuth1 keys from the source stoken
self.oauth = OAuth1Session(client_key=self.mytoken['consumer_key'],
client_secret=self.mytoken['consumer_secret'],
resource_owner_key=self.mytoken['access_token'],
resource_owner_secret=self.mytoken['access_secret'],
realm='ADM')
def dep_prep(self, query, method, authsession=None, token=None, params=False):
"""
Sets up common headers for DEP commands using the 'requests' Request
class to combine our auth token, required headers and other data
to generate a correct HTTP request to be sent to the DEP API.
Required parameters:
- query (The API request to use)
- method (The HTTP method to use: GET/PUT/POST)
- token (The auth session token retrieved by get_auth_token())
Optional parameters:
- authsession (expects an OAuth1Session instance)
- params (query string to send instead of JSON data)
"""
req = Request(method, self.dep_api_url + query)
prep = None
# Check whether we're requesting an auth session token or doing a regular
# API call with DEP.
if authsession:
prep = authsession.prepare_request(req)
# Regular API calls require the X-ADM-Auth-Session header to be set
elif token:
prep = req.prepare()
prep.headers['X-ADM-Auth-Session'] = token
# If we received no token or token is None we have a problem, halt.
else:
print "No token found, we must exit now..."
sys.exit(-1)
# Common required headers for DEP API calls, we use v2 as v1 is deprecated
prep.headers['X-Server-Protocol-Version'] = '2'
# A few (or just one) calls use a query string instead of JSON so we skip
# setting the Content-Type header for those.
if not params:
prep.headers['Content-Type'] = 'application/json;charset=UTF8'
return prep
def _connect(self):
logging.info("creating http session")
self.client = OAuth1Session(
client_key=self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.access_token,
resource_owner_secret=self.access_token_secret
)
search_twitter_accounts.py 文件源码
项目:DSTC6-End-to-End-Conversation-Modeling
作者: dialogtekgeek
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def Main(args):
# get access keys from a config file
config = ConfigParser()
config.read(args.config)
ConsumerKey = config.get('AccessKeys','ConsumerKey')
ConsumerSecret = config.get('AccessKeys','ConsumerSecret')
AccessToken = config.get('AccessKeys','AccessToken')
AccessTokenSecret = config.get('AccessKeys','AccessTokenSecret')
# open a session
session = OAuth1Session(ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret)
# collect users from the queries
user_search = GETUsersSearch(session)
user_search.setParams(' '.join(args.queries), target_count=args.count)
user_search.waitReady()
result = user_search.call()
logger.info('obtained %d users' % len(result))
if args.dump:
logger.info('writing raw data to file %s' % args.dump)
json.dump(result, open(args.dump,'w'), indent=2)
if args.output:
logger.info('writing screen names to file %s' % args.output)
with open(args.output,'w') as f:
for user in result:
f.write(user['screen_name'] + '\n')
else:
for user in result:
sys.stdout.write(user['screen_name'] + '\n')
def __init__(self, client_key, client_secret, resource_owner_key=None,
resource_owner_secret=None, user_id=None, callback_uri=None,
*args, **kwargs):
"""
Create a FitbitOauthClient object. Specify the first 5 parameters if
you have them to access user data. Specify just the first 2 parameters
to access anonymous data and start the set up for user authorization.
Set callback_uri to a URL and when the user has granted us access at
the fitbit site, fitbit will redirect them to the URL you passed. This
is how we get back the magic verifier string from fitbit if we're a web
app. If we don't pass it, then fitbit will just display the verifier
string for the user to copy and we'll have to ask them to paste it for
us and read it that way.
"""
self.session = requests.Session()
self.client_key = client_key
self.client_secret = client_secret
self.resource_owner_key = resource_owner_key
self.resource_owner_secret = resource_owner_secret
if user_id:
self.user_id = user_id
params = {'client_secret': client_secret}
if callback_uri:
params['callback_uri'] = callback_uri
if self.resource_owner_key and self.resource_owner_secret:
params['resource_owner_key'] = self.resource_owner_key
params['resource_owner_secret'] = self.resource_owner_secret
self.oauth = OAuth1Session(client_key, **params)
def test_authorize_token_url(self):
# authorize_token_url calls oauth and returns a URL
fb = Fitbit(**self.client_kwargs)
with mock.patch.object(OAuth1Session, 'authorization_url') as au:
au.return_value = 'FAKEURL'
retval = fb.client.authorize_token_url()
self.assertEqual(1, au.call_count)
self.assertEqual("FAKEURL", retval)
def oauth_request():
try:
o = OAuth1Session(const.CONSUMER_KEY, const.CONSUMER_SECRET)
req = o.fetch_request_token("http://fanfou.com/oauth/request_token")
ov = request.url_root[:-1] + url_for(".oauth_verify")
session['req'] = req
auth = o.authorization_url("http://fanfou.com/oauth/authorize", oauth_callback=ov)
except ValueError:
session['error_msg'] = "???????????"
return redirect(url_for('.xauth'))
return redirect(auth)
def oauth_verify():
try:
req = session['req']
o = OAuth1Session(const.CONSUMER_KEY, const.CONSUMER_SECRET,
req['oauth_token'],
req['oauth_token_secret'], verifier=req['oauth_token'])
ac = o.fetch_access_token("http://fanfou.com/oauth/access_token")
session['req'] = ac
user = o.get("http://api.fanfou.com/account/verify_credentials.json?mode=lite").json()
except:
session['error_msg'] = "?????????"
return redirect(url_for('.xauth'))
try:
try:
ff_auth = FFAuth.query.equal_to('uniqueID', user['unique_id']).first()
except LeanCloudError as err:
if err.code == 101:
ff_auth = FFAuth()
ff_auth.set('username', user['id'])
ff_auth.set('nickname', user['name'])
ff_auth.set('uniqueID', user['unique_id'])
ff_auth.set('token', ac['oauth_token'])
ff_auth.set('secret', ac['oauth_token_secret'])
ff_auth.save()
except LeanCloudError:
session['error_msg'] = "????????"
return redirect(url_for('.xauth'))
login_user(ff_auth, True)
return redirect(url_for('main.index'))
def get_access_token(consumer_key, consumer_secret):
oauth_client = OAuth1Session(consumer_key, client_secret=consumer_secret, callback_uri='oob')
print('Requesting temp token from Twitter')
try:
resp = oauth_client.fetch_request_token(REQUEST_TOKEN_URL)
except ValueError:
print('Invalid respond from Twitter requesting temp token: %s' % e)
return
url = oauth_client.authorization_url(AUTHORIZATION_URL)
print('')
print('I will try to start a browser to visit the following Twitter page')
print('if a browser will not start, copy the URL to your browser')
print('and retrieve the pincode to be used')
print('in the next step to obtaining an Authentication Token:')
print('')
print(url)
print('')
webbrowser.open(url)
pincode = input('Pincode? ')
print('')
print('Generating and signing request for an access token')
print('')
oauth_client = OAuth1Session(consumer_key, client_secret=consumer_secret,
resource_owner_key=resp.get('oauth_token'),
resource_owner_secret=resp.get('oauth_token_secret'),
verifier=pincode
)
try:
resp = oauth_client.fetch_access_token(ACCESS_TOKEN_URL)
except ValueError:
print('Invalid respond from Twitter requesting access token: %s' % e)
return
print('Your Twitter Access Token key: %s' % resp.get('oauth_token'))
print(' Access Token secret: %s' % resp.get('oauth_token_secret'))
print('')
def get_request_token(self):
'''get_request_token() -> auth url
some params handled by requests_oauthlib but put in
doc string for clarity into the API.
param: oauth_consumer_key
type: str
description: the value used by the consumer to identify
itself to the service provider.
param: oauth_timestamp
type: int
description: the date and time of the request, in epoch time.
must be accurate within five minutes.
param: oauth_nonce
type: str
description: a nonce, as discribed in the authorization guide
roughly, an arbitrary or random value that cannot
be used again with the same timestamp.
param: oauth_signature_method
type: str
description: the signature method used by the consumer to sign
the request. the only supported value is 'HMAC-SHA1'.
param: oauth_signature
type: str
description: signature generated with the shared secret and token
secret using the specified oauth_signature_method
as described in OAuth documentation.
param: oauth_callback
type: str
description: callback information, as described elsewhere. must
always be set to 'oob' whether using a callback or
not
rtype: str
description: Etrade autherization url'''
# Set up session
self.session = OAuth1Session(self.consumer_key,
self.consumer_secret,
callback_uri=self.callback_url,
signature_type='AUTH_HEADER')
# get request token
self.session.fetch_request_token(self.req_token_url)
# get authorization url
#etrade format: url?key&token
authorization_url = self.session.authorization_url(self.auth_token_url)
akey = self.session.parse_authorization_response(authorization_url)
# store oauth_token
self.resource_owner_key = akey['oauth_token']
formated_auth_url = '%s?key=%s&token=%s' % (self.auth_token_url,
self.consumer_key,
akey['oauth_token'])
self.verifier_url = formated_auth_url
logger.debug(formated_auth_url)
return formated_auth_url
def get_access_token(consumer_key, consumer_secret):
REQUEST_TOKEN_URL = 'https://api.twitter.com/oauth/request_token'
ACCESS_TOKEN_URL = 'https://api.twitter.com/oauth/access_token'
AUTHORIZATION_URL = 'https://api.twitter.com/oauth/authorize'
oauth_client = OAuth1Session(consumer_key, client_secret=consumer_secret,
callback_uri='oob')
print('\nRequesting temp token from Twitter...\n')
try:
resp = oauth_client.fetch_request_token(REQUEST_TOKEN_URL)
except ValueError as e:
raise ValueError(
'Invalid response from Twitter requesting temp token: {0}'.format(
e))
url = oauth_client.authorization_url(AUTHORIZATION_URL)
print('I will try to start a browser to visit the following Twitter page '
'if a browser will not start, copy the URL to your browser '
'and retrieve the pincode to be used '
'in the next step to obtaining an Authentication Token: \n'
'\n\t{0}'.format(url))
webbrowser.open(url)
pincode = raw_input('\nEnter your pincode? ')
print('\nGenerating and signing request for an access token...\n')
oauth_client = OAuth1Session(consumer_key, client_secret=consumer_secret,
resource_owner_key=resp.get('oauth_token'),
resource_owner_secret=resp.get(
'oauth_token_secret'),
verifier=pincode)
try:
resp = oauth_client.fetch_access_token(ACCESS_TOKEN_URL)
except ValueError as e:
msg = ('Invalid response from Twitter requesting '
'temp token: {0}').format(e)
raise ValueError(msg)
#
# print('''Your tokens/keys are as follows:
# consumer_key = {ck}
# consumer_secret = {cs}
# access_token_key = {atk}
# access_token_secret = {ats}'''.format(
# ck=consumer_key,
# cs=consumer_secret,
# atk=resp.get('oauth_token'),
# ats=resp.get('oauth_token_secret')))
return resp.get('oauth_token'), resp.get('oauth_token_secret')