def get_google_auth(state=None, token=None):
if token:
return OAuth2Session(Auth.CLIENT_ID, token=token)
if state:
return OAuth2Session(
Auth.CLIENT_ID,
state=state,
redirect_uri=Auth.REDIRECT_URI,
scope=['email']
)
oauth = OAuth2Session(
Auth.CLIENT_ID,
redirect_uri=Auth.REDIRECT_URI,
scope=['email']
)
return oauth
python类OAuth2Session()的实例源码
def get_google_auth(state=None, token=None):
"""
get_google_auth creates an oauth object for google oauth
Thanks http://bitwiser.in/2015/09/09/add-google-login-in-flask.html
"""
# if token from server is available, just use it
# we can now fetch user info from google
if token:
return OAuth2Session(Auth.CLIENT_ID, token=token)
# if state is set (& token is not), create an OAuth session to fetch token
if state:
return OAuth2Session(
Auth.CLIENT_ID,
state=state,
redirect_uri=Auth.REDIRECT_URI)
# neither token nor state is set
# start a new oauth session
oauth = OAuth2Session(
Auth.CLIENT_ID,
redirect_uri=Auth.REDIRECT_URI,
scope=Auth.SCOPE)
return oauth
def oauth_from_token(meta):
"""
Recreate a oauth2 object from a token
args:
token (dict): a oauth2 token object
token_updater (func): a function that is triggered upon a token update
returns:
OAuth2Session: an auth2 session
"""
def inner(new_token):
meta.update_token(new_token)
return OAuth2Session(token=meta.token, auto_refresh_url=meta.token_endpoint, scope=scope, token_updater=inner,
client_id=client_id)
def _get_token(self):
"""Get an API token.
Raises:
AuthenticationError: if getting token fails.
"""
client = BackendApplicationClient(client_id=CLIENT_ID)
oauth = OAuth2Session(client=client)
# Retry auth if error (to get around intermittent failures)
latest_exception = None
for i in range(3):
try:
token = oauth.fetch_token(
token_url=AUTH_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
self.__token = token["access_token"]
self.__session = oauth
self._me = None
return
except (AccessDeniedError, InvalidClientError, MissingTokenError) as e:
latest_exception = e
continue
raise AuthenticationError("Failed to get authentication token: {0}".format(latest_exception))
def __init__(self, client_id, client_secret, user=None, storage=None,
redirect_uri=None, token=None, token_callback=None, **kwargs):
self.client_id = client_id
self.client_secret = client_secret
self.user = user
self.storage = storage
self.token = token or {}
self.token_callback = token_callback
if self.token:
# We already have a token, pass it along.
self.oauthsession = OAuth2Session(
token=self.token, **kwargs)
else:
# We have yet to obtain a token, so we have only the client ID etc.
# needed to call `authorization_url()` and get a token.
self.oauthsession = OAuth2Session(
self.client_id, redirect_uri=redirect_uri,
scope=self.SCOPES, **kwargs)
def __init__(self, client_id, client_secret, user=None, storage=None,
redirect_uri=None, token=None, token_callback=None, **kwargs):
self.client_id = client_id
self.client_secret = client_secret
self.user = user
self.storage = storage
self.token = token or {}
self.token_callback = token_callback
if self.token:
# We already have a token, pass it along.
self.oauthsession = OAuth2Session(
token=self.token, **kwargs)
else:
# We have yet to obtain a token, so we have only the client ID etc.
# needed to call `authorization_url()` and get a token.
self.oauthsession = OAuth2Session(
self.client_id, redirect_uri=redirect_uri,
scope=self.SCOPES, **kwargs)
def __init__(self, client_id, client_secret, user=None, storage=None,
redirect_uri=None, token=None, token_callback=None, **kwargs):
self.client_id = client_id
self.client_secret = client_secret
self.user = user
self.storage = storage
self.token = token or {}
self.token_callback = token_callback
if self.token:
# We already have a token, pass it along.
self.oauthsession = OAuth2Session(
token=self.token, **kwargs)
else:
# We have yet to obtain a token, so we have only the client ID etc.
# needed to call `authorization_url()` and get a token.
self.oauthsession = OAuth2Session(
self.client_id, redirect_uri=redirect_uri,
scope=self.SCOPES, **kwargs)
def __init__(self, client_id, client_secret,
access_token=None, refresh_token=None,
*args, **kwargs):
"""
Create a FitbitOauth2Client object. Specify the first 7 parameters if
you have them to access user data. Specify just the first 2 parameters
to start the setup for user authorization (as an example see gather_key_oauth2.py)
- client_id, client_secret are in the app configuration page
https://dev.fitbit.com/apps
- access_token, refresh_token are obtained after the user grants permission
"""
self.session = requests.Session()
self.client_id = client_id
self.client_secret = client_secret
self.token = {
'access_token': access_token,
'refresh_token': refresh_token
}
self.oauth = OAuth2Session(client_id)
def _get_oauth_token(self):
"""
Get Monzo access token via OAuth2 `authorization code` grant type.
Official docs:
https://monzo.com/docs/#acquire-an-access-token
:returns: OAuth 2 access token
:rtype: dict
"""
url = urljoin(self.api_url, '/oauth2/token')
oauth = OAuth2Session(
client_id=self._client_id,
redirect_uri=config.PYMONZO_REDIRECT_URI,
)
token = oauth.fetch_token(
token_url=url,
code=self._auth_code,
client_secret=self._client_secret,
)
return token
def refresh(self, session=None, auth=None):
"""
Refreshes the token.
:param session: :class:`requests_oauthlib.OAuth2Session` for refreshing token with.
:param auth: :class:`requests.auth.HTTPBasicAuth`
"""
if self.can_refresh:
if not session:
session = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID)
if not auth:
auth = HTTPBasicAuth(app_settings.ESI_SSO_CLIENT_ID, app_settings.ESI_SSO_CLIENT_SECRET)
try:
self.access_token = \
session.refresh_token(app_settings.ESI_TOKEN_URL, refresh_token=self.refresh_token, auth=auth)[
'access_token']
self.created = timezone.now()
self.save()
except (InvalidGrantError, MissingTokenError):
raise TokenInvalidError()
except InvalidClientError:
raise ImproperlyConfigured('Verify ESI_SSO_CLIENT_ID and ESI_SSO_CLIENT_SECRET settings.')
else:
raise NotRefreshableTokenError()
def __init__(self, client_id, client_secret,
access_token=None, refresh_token=None,
*args, **kwargs):
"""
Create a FitbitOauth2Client object. Specify the first 7 parameters if
you have them to access user data. Specify just the first 2 parameters
to start the setup for user authorization (as an example see gather_key_oauth2.py)
- client_id, client_secret are in the app configuration page
https://dev.fitbit.com/apps
- access_token, refresh_token are obtained after the user grants permission
"""
self.session = requests.Session()
self.client_id = client_id
self.client_secret = client_secret
self.token = {
'access_token': access_token,
'refresh_token': refresh_token
}
self.oauth = OAuth2Session(client_id)
def callback():
""" Step 3: Retrieving an access token.
The user has been redirected back from the provider to your registered
callback URL. With this redirection comes an authorization code included
in the redirect URL. We will use that to obtain an access token.
"""
mautic = OAuth2Session(client_id, redirect_uri=redirect_uri,
state=session['oauth_state'])
token = mautic.fetch_token(token_url, client_secret=client_secret, authorization_response=request.url)
# We use the session as a simple DB for this example.
session['oauth_token'] = token
update_token_tempfile(token) # store token in /tmp/mautic_creds.json
return redirect(url_for('.menu'))
def refresh():
token = __get_session_token__()
refresh_token_arg = request.args.get('refresh_token')
if refresh_token_arg != None:
refresh_token = refresh_token_arg
else:
try:
refresh_token = token['refresh_token']
except KeyError:
refresh_token = ''
questradeAPI = OAuth2Session(client_id, token=token)
token = questradeAPI.refresh_token(token_url, refresh_token=refresh_token)
__set_session_token__(token)
return redirect(url_for('.token'))
def cmd_info(message, parameters, recursion=0):
await client.send_typing(message.channel)
async for msg in client.logs_from(message.channel, limit=25):
try:
if msg.attachments:
img = msg.attachments[0]['url']
while not tokenAvailable:
asyncio.get_event_loop().create_task(tokenTimedOut())
await asyncio.sleep(1)
api = OAuth2Session(aestheticsApiClientID, token=aesthToken)
params = {'url': img, 'num_keywords': 10}
keywords = api.get('https://api.everypixel.com/v1/keywords', params=params).json()
quality = api.get('https://api.everypixel.com/v1/quality', params=params).json()
if 'error' in params.keys() or 'error' in quality.keys():
await client.send_message(message.channel, "**Error:** API limit, you'll either have to wait like 5 mins so I get more requests or manually upload the image to https://everypixel.com/aesthetics")
return
e = discord.Embed(colour=0x4DB15A)
tags = [ tag['keyword'] for tag in keywords['keywords'] ]
e.description = "```fix\nScore = {}% Awesome Image!```\n```md\n#{}```".format('{0:.1f}'.format(float(quality['quality']['score'])*100.0), "\n#".join(tags))
await client.send_message(message.channel, embed=e)
return
except Exception as e:
raise e
def get_google_auth(state=None, token=None):
if token:
return OAuth2Session(Auth.CLIENT_ID, token=token)
if state:
return OAuth2Session(
Auth.CLIENT_ID,
state=state,
redirect_uri=Auth.REDIRECT_URI,
scope=['email']
)
oauth = OAuth2Session(
Auth.CLIENT_ID,
redirect_uri=Auth.REDIRECT_URI,
scope=['email']
)
return oauth
helpers.py 文件源码
项目:google-auth-library-python-oauthlib
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def session_from_client_secrets_file(client_secrets_file, scopes, **kwargs):
"""Creates a :class:`requests_oauthlib.OAuth2Session` instance from a
Google-format client secrets file.
Args:
client_secrets_file (str): The path to the `client secrets`_ .json
file.
scopes (Sequence[str]): The list of scopes to request during the
flow.
kwargs: Any additional parameters passed to
:class:`requests_oauthlib.OAuth2Session`
Returns:
Tuple[requests_oauthlib.OAuth2Session, Mapping[str, Any]]: The new
oauthlib session and the validated client configuration.
.. _client secrets:
https://developers.google.com/api-client-library/python/guide
/aaa_client_secrets
"""
with open(client_secrets_file, 'r') as json_file:
client_config = json.load(json_file)
return session_from_client_config(client_config, scopes, **kwargs)
def get_redirect_url(self, *args, **kwargs):
client = OAuth2Session(
client_id=settings.OAUTH['client_id'],
redirect_uri=settings.OAUTH['redirect_domain'] + reverse('oauth_return_view'),
scope=['view_profile'],
)
# extract next url
next_url = self.request.GET.get('next', None)
# save it to the session
if next_url and is_safe_url(next_url):
self.request.session['login_next_url'] = next_url
# the user is then redirected to the auth provider with the right parameters
url, state = client.authorization_url(settings.OAUTH['authorization_url'])
# the nonce is saved inside the session
self.request.session['oauth2_nonce'] = state
return url
def __init__(self, client_id=None, client_secret=None, token_updater=None, **kwargs):
if not (client_secret and client_id):
raise InvalidUsage('You must supply a client_id and client_secret')
if kwargs.get('session') or kwargs.get('user'):
warnings.warn('pysnow.OAuthClient manages sessions internally, '
'provided user / password credentials or sessions will be ignored.')
# Forcibly set session, user and password
kwargs['session'] = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
kwargs['user'] = None
kwargs['password'] = None
super(OAuthClient, self).__init__(**kwargs)
self.token_updater = token_updater
self.client_id = client_id
self.client_secret = client_secret
self.token_url = "%s/oauth_token.do" % self.base_url
def _get_oauth_session(self):
"""Creates a new OAuth session
:return:
- OAuth2Session object
"""
return self._get_session(
OAuth2Session(
client_id=self.client_id,
token=self.token,
token_updater=self.token_updater,
auto_refresh_url=self.token_url,
auto_refresh_kwargs={
"client_id": self.client_id,
"client_secret": self.client_secret
}
)
)
def login(self, **kwargs):
try:
oauth2_session = requests_oauthlib.OAuth2Session(client_id=self.get_config_value('CLIENT_ID'),
redirect_uri=self.callback_url(**kwargs),
scope=self.get_config_value('SCOPES'),
state=flask.session[self.state_session_key])
token = oauth2_session.fetch_token(self.get_config_value('TOKEN_URL'),
code=flask.request.args['code'], client_secret=self.get_config_value('CLIENT_SECRET'))
except Exception as exc:
flask.current_app.logger.exception("exception while retrieving token")
self.login_failure_func(exc)
try:
profile = self.get_profile(oauth2_session)
except Exception as exc:
flask.current_app.logger.exception("exception while retrieving profile")
self.login_failure_func(exc)
# flask.current_app.logger.debug("profile: {!s}".format(pprint.pformat(profile)))
return self.login_success_func(token, profile, **kwargs)
def callback(request):
"""
Step 2 of OAuth: fetch the token.
"""
try:
oauth_state = request.session['oauth_state']
except KeyError:
return HttpResponseBadRequest('Missing oauth state.')
github = OAuth2Session(settings.GITHUB_CLIENT_ID, state=oauth_state)
token = github.fetch_token(
settings.GITHUB_TOKEN_URL,
client_secret=settings.GITHUB_CLIENT_SECRET,
authorization_response=request.build_absolute_uri()
)
try:
OAuthToken.objects.create(user=request.user, value=token['access_token'])
except (KeyError, TypeError):
return HttpResponseBadRequest('Cannot read access_token.')
Repository.add_user_to_known_repositories(request.user)
return redirect("home")
def __init__(self, client_id , client_secret,
access_token=None, refresh_token=None,
*args, **kwargs):
"""
Create a FitbitOauth2Client object. Specify the first 7 parameters if
you have them to access user data. Specify just the first 2 parameters
to start the setup for user authorization (as an example see gather_key_oauth2.py)
- client_id, client_secret are in the app configuration page
https://dev.fitbit.com/apps
- access_token, refresh_token are obtained after the user grants permission
"""
self.session = requests.Session()
self.client_id = client_id
self.client_secret = client_secret
self.token = {'access_token' : access_token,
'refresh_token': refresh_token}
self.oauth = OAuth2Session(client_id)
def test_auto_refresh_token_exception(self):
# test of auto_refresh with tokenExpired exception
# 1. first call to _request causes a TokenExpired
# 2. the token_refresh call is faked
# 3. the second call to _request returns a valid value
kwargs = self.client_kwargs
kwargs['access_token'] = 'fake_access_token'
kwargs['refresh_token'] = 'fake_refresh_token'
fb = Fitbit(**kwargs)
with mock.patch.object(FitbitOauth2Client, '_request') as r:
r.side_effect = [TokenExpiredError, fake_response(200,'correct_response')]
with mock.patch.object(OAuth2Session, 'refresh_token') as rt:
rt.return_value = {
'access_token': 'fake_return_access_token',
'refresh_token': 'fake_return_refresh_token'
}
retval = fb.client.make_request(Fitbit.API_ENDPOINT + '/1/user/-/profile.json')
self.assertEqual("correct_response", retval.text)
self.assertEqual("fake_return_access_token", fb.client.token['access_token'])
self.assertEqual("fake_return_refresh_token", fb.client.token['refresh_token'])
self.assertEqual(1, rt.call_count)
self.assertEqual(2, r.call_count)
def callback():
""" Step 3: Retrieving an access token.
The user has been redirected back from the provider to your registered
callback URL. With this redirection comes an authorization code included
in the redirect URL. We will use that to obtain an access token.
NOTE: your server name must be correctly configured in order for this to
work, do this by adding the headers at your http layer, in particular:
X_FORWARDED_HOST, X_FORWARDED_PROTO so that bottle can render the correct
url and links for you.
"""
oauth2session = OAuth2Session(settings.SLACK_OAUTH['client_id'],
state=request.GET['state'])
#PII - update privacy policy if oauth2 token is stored.
token = oauth2session.fetch_token(
settings.SLACK_OAUTH['token_url'],
client_secret=settings.SLACK_OAUTH['client_secret'],
authorization_response=request.url
)
# we don't need the token, we just need the user to have installed the app
# in the future, if we need tokens, we'll get them.
redirect('/?added_to_slack=true')
def createDAsession():
'''Create a DeviantArt session using The Client Credentials access by Backend Application Flow from oauthlib.oauth2 and OAuth2Session from Requests-OAuthlib '''
global client_id, client_secret, deviantart_client, deviantart_session, token
deviantart_session = OAuth2Session(client = deviantart_client)
token=get_token()
deviantart_session = OAuth2Session(client_id, token=token)
def createDAsession():
'''Create a DeviantArt session using The Client Credentials access by Backend Application Flow and OAuth2Session from oauthlib.oauth2 / Requests-OAuthlib '''
global deviantart_client, deviantart_session, token
deviantart_session = OAuth2Session(client = deviantart_client)
token=get_token()
deviantart_session = OAuth2Session(client_id, token=token)
def createDAsession():
'''Create a DeviantArt session using The Client Credentials access by Backend Application Flow and OAuth2Session from oauthlib.oauth2 / Requests-OAuthlib '''
global deviantart_client, deviantart_session, token
deviantart_session = OAuth2Session(client = deviantart_client)
token=get_token()
deviantart_session = OAuth2Session(client_id, token=token)
def createDAsession():
'''Create a DeviantArt session using The Client Credentials access by Backend Application Flow and OAuth2Session from oauthlib.oauth2 / Requests-OAuthlib '''
global deviantart_client, deviantart_session, token
deviantart_session = OAuth2Session(client = deviantart_client)
token=get_token()
deviantart_session = OAuth2Session(client_id, token=token)
def create_session():
""" Creates/resets and OAuth 2 session, with the specified data. """
global session
global settings
try:
client_id = settings['client_id']
redirect_uri = settings['redirect_uri']
except KeyError as e:
raise KeyError("The OAuth2 settings dictionary is missing the {} entry. "
"Please add it to the QOpenScienceFramework.connection.settings "
"dicationary before trying to create a new session".format(e))
# Set up requests_oauthlib object
mobile_app_client = MobileApplicationClient(client_id)
# Create an OAuth2 session for the OSF
session = requests_oauthlib.OAuth2Session(
client_id,
mobile_app_client,
scope=scope,
redirect_uri=redirect_uri,
)
return session
# Generate correct URLs
def is_authorized():
""" Convenience function simply returning OAuth2Session.authorized.
Returns
-------
bool
True is the user is authorized, False if not
"""
check_for_active_session()
return session.authorized