def create_oauth_session(port, auto_refresh_url):
"""
Create a oauth2 callback webserver
args:
port (int): the port where to listen to
returns:
OAuth2Session: a oauth2 session object
"""
logger.info("Creating an oauth session, temporarily starting webserver on port {} for auth callback".format(port))
redirect_uri = 'http://127.0.0.1:%s/callback' % port
oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, auto_refresh_url=auto_refresh_url, scope=scope)
return oauth
python类OAuth2Session()的实例源码
def authenticate(self, username, password):
if username in self._exclude_list:
htpasswd_file = HtpasswdFile(path=self._file_path)
result = htpasswd_file.check_password(username, password)
if result is None:
LOG.info('User "%s" doesn\'t exist' % (username))
elif result is False:
LOG.info('Invalid password for user "%s"' % (username))
elif result is True:
LOG.info(
'Authentication for user "%s" successful' %
(username))
return bool(result)
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
oauth = OAuth2Session(
client=LegacyApplicationClient(
client_id=self._client_id))
try:
token = oauth.fetch_token(
token_url=self._token_url,
username=username,
password=password,
client_id=self._client_id,
client_secret=self._client_secret,
verify=False)
except Exception as e:
LOG.info(
'Authentication for user "{}" failed: {}'.format(
username, str(e)))
return False
if token is not None:
LOG.info('Authentication for user "{}" successful'.format(username))
return True
return False
def __init__(self, request):
self._request = request
self._session = OAuth2Session(
self.client_id,
scope=self.scope,
redirect_uri=request.build_absolute_uri('.'),
)
def __init__(self, request):
self._request = request
self._session = OAuth2Session(
self.client_id,
scope=self.scope,
redirect_uri=request.build_absolute_uri('.'),
)
def fetch_access_token(self, code, redirect_uri):
"""Step 2: Given the code from fitbit from step 1, call
fitbit again and returns an access token object. Extract the needed
information from that and save it to use in future API calls.
the token is internally saved
"""
auth = OAuth2Session(self.client_id, redirect_uri=redirect_uri)
self.token = auth.fetch_token(
self.access_token_url,
username=self.client_id,
password=self.client_secret,
code=code)
return self.token
def get_authorize_url(self):
provider = OAuth2Session(self.client_id,
scope=self.scope,
redirect_uri=self.redirect_uri)
authorization_url, state = provider.authorization_url(
self.base_url,
access_type='online',
approval_prompt='force',
)
session['_oauth_state'] = state
return authorization_url
def _get_response(self, method, endpoint, params=None):
"""
Helper method to handle HTTP requests and catch API errors
:param method: valid HTTP method
:type method: str
:param endpoint: API endpoint
:type endpoint: str
:param params: extra parameters passed with the request
:type params: dict
:returns: API response
:rtype: Response
"""
url = urljoin(self.api_url, endpoint)
try:
response = getattr(self._session, method)(url, params=params)
# Check if Monzo API returned HTTP 401, which could mean that the
# token is expired
if response.status_code == 401:
raise TokenExpiredError
except TokenExpiredError:
# For some reason 'requests-oauthlib' automatic token refreshing
# doesn't work so we do it here semi-manually
self._refresh_oath_token()
self._session = OAuth2Session(
client_id=self._client_id,
token=self._token,
)
response = getattr(self._session, method)(url, params=params)
if response.status_code != requests.codes.ok:
raise MonzoAPIError(
"Something went wrong: {}".format(response.json())
)
return response
def oauth_session(request, state=None, token=None):
""" Constructs the OAuth2 session object. """
if settings.DISCORD_REDIRECT_URI is not None:
redirect_uri = settings.DISCORD_REDIRECT_URI
else:
redirect_uri = request.build_absolute_uri(
reverse('discord_bind_callback'))
scope = (['email', 'guilds.join'] if settings.DISCORD_EMAIL_SCOPE
else ['identity', 'guilds.join'])
return OAuth2Session(settings.DISCORD_CLIENT_ID,
redirect_uri=redirect_uri,
scope=scope,
token=token,
state=state)
def bulk_refresh(self):
"""
Refreshes all refreshable tokens in the queryset.
Deletes any tokens which fail to refresh.
Deletes any tokens which are expired and cannot refresh.
"""
session = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID)
auth = requests.auth.HTTPBasicAuth(app_settings.ESI_SSO_CLIENT_ID, app_settings.ESI_SSO_CLIENT_SECRET)
for model in self.filter(refresh_token__isnull=False):
try:
model.refresh(session=session, auth=auth)
except TokenError:
model.delete()
self.filter(refresh_token__isnull=True).get_expired().delete()
def sso_redirect(request, scopes=list([]), return_to=None):
"""
Generates a :model:`esi.CallbackRedirect` for the specified request.
Redirects to EVE for login.
Accepts a view or URL name as a redirect after SSO.
"""
if isinstance(scopes, string_types):
scopes = list([scopes])
# ensure only one callback redirect model per session
CallbackRedirect.objects.filter(session_key=request.session.session_key).delete()
# ensure session installed in database
if not request.session.exists(request.session.session_key):
request.session.create()
if return_to:
url = reverse(return_to)
else:
url = request.get_full_path()
oauth = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID, redirect_uri=app_settings.ESI_SSO_CALLBACK_URL, scope=scopes)
redirect_url, state = oauth.authorization_url(app_settings.ESI_OAUTH_LOGIN_URL)
CallbackRedirect.objects.create(session_key=request.session.session_key, state=state, url=url)
return redirect(redirect_url)
def get_token_data(cls, access_token):
session = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID, token={'access_token': access_token})
return session.request('get', app_settings.ESI_TOKEN_VERIFY_URL).json()
def fetch_access_token(self, code, redirect_uri):
"""Step 2: Given the code from fitbit from step 1, call
fitbit again and returns an access token object. Extract the needed
information from that and save it to use in future API calls.
the token is internally saved
"""
auth = OAuth2Session(self.client_id, redirect_uri=redirect_uri)
self.token = auth.fetch_token(
self.access_token_url,
username=self.client_id,
password=self.client_secret,
code=code)
return self.token
def __init__(
self,
base_url,
client_id,
client_secret=None,
scope=None,
token=None,
token_updater=None
):
"""
:param base_url: str Base URL for Mautic API E.g. `https://<your-domain>.mautic.net`
:param client_id: str Mautic API Public Key
:param client_secret: str Mautic API Secret Key - needed to autorefresh token
:param scope: list|str
:param token: dict with access token data
:param token_updater: function used for token autorefresh.
"""
if scope and not isinstance(scope, (list, tuple)):
scope = scope.split(',')
self.base_url = base_url.strip(' /')
self.access_token_url = base_url + '/oauth/v2/token'
self.authorization_base_url = base_url + '/oauth/v2/authorize'
if token_updater is not None and client_secret is not None:
kwargs = {
'auto_refresh_url': self.access_token_url,
'auto_refresh_kwargs': {
'client_id': client_id,
'client_secret': client_secret
},
'token_updater': token_updater
}
else:
kwargs = {}
self.session = OAuth2Session(
client_id, scope=scope, token=token, **kwargs
)
def index():
"""Step 1: User Authorization.
Redirect the user/resource owner to the OAuth provider using an URL with a few key OAuth parameters.
"""
mautic = OAuth2Session(client_id, redirect_uri=redirect_uri)
authorization_url, state = mautic.authorization_url(authorization_base_url, grant_type='authorization_code')
session['oauth_state'] = state
return redirect(authorization_url)
# Step 2: User authorization, this happens on the provider.
def make_fake_response(text, *args, **kwargs):
"""Mock out the response object returned by requests_oauthlib.OAuth2Session.get(...)."""
mm = MagicMock(name="response")
mm.text = text
if "json" in kwargs:
mm.json.return_value = kwargs["json"]
else:
mm.json.return_value = json.loads(text)
if "status_code" in kwargs:
mm.status_code = kwargs["status_code"]
return mm
authcontroller.py 文件源码
项目:NZ-ORCID-Hub
作者: Royal-Society-of-New-Zealand
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def profile():
"""Fetch a protected resource using an OAuth 2 token."""
user = current_user
app.logger.info(f"For {user} trying to display profile by getting ORCID token")
try:
orcid_token = OrcidToken.get(user_id=user.id, org=user.organisation)
except OrcidToken.DoesNotExist:
app.logger.info(f"For {user} we dont have ocrditoken so redirecting back to link page")
return redirect(url_for("link"))
except Exception as ex:
# TODO: need to handle this
app.logger.exception("Failed to retrieve ORCID token form DB.")
flash("Unhandled Exception occured: %s" % ex, "danger")
return redirect(url_for("login"))
else:
client = OAuth2Session(
user.organisation.orcid_client_id, token={
"access_token": orcid_token.access_token
})
base_url = ORCID_API_BASE + user.orcid
# TODO: utilize asyncio/aiohttp to run it concurrently
resp_person = client.get(base_url + "/person", headers=HEADERS)
app.logger.info("For %r logging response code %r, While fetching profile info", user,
resp_person.status_code)
if resp_person.status_code == 401:
orcid_token.delete_instance()
app.logger.info("%r has removed his organisation from trusted list", user)
return redirect(url_for("link"))
else:
users = User.select().where(User.orcid == user.orcid)
users_orcid = OrcidToken.select().where(OrcidToken.user.in_(users))
app.logger.info("For %r everything is fine, So displaying profile page", user)
return render_template(
"profile.html", user=user, users_orcid=users_orcid, profile_url=ORCID_BASE_URL)
def callback(request):
response = OrderedDict()
oas = OAuth2Session(request.session['client_id'],
redirect_uri=request.session['redirect_uri'])
host = settings.HOSTNAME_URL
if not(host.startswith("http://") or host.startswith("https://")):
host = "https://%s" % (host)
auth_uri = host + request.get_full_path()
token = oas.fetch_token(request.session['token_uri'],
client_secret=request.session['client_secret'],
authorization_response=auth_uri)
request.session['token'] = token
response['token_response'] = OrderedDict()
for k, v in token.items():
if k != "scope":
response['token_response'][k] = v
else:
response['token_response'][k] = ' '.join(v)
userinfo = oas.get(request.session['userinfo_uri']).json()
response[request.session['userinfo_uri']] = userinfo
request.session['patient'] = userinfo['patient']
response['oidc_discovery_uri'] = host + \
reverse('openid-configuration')
response['fhir_metadata_uri'] = host + \
reverse('fhir_conformance_metadata')
response['test_page'] = host + reverse('test_links')
return JsonResponse(response)
def test_userinfo(request):
oas = OAuth2Session(
request.session['client_id'], token=request.session['token'])
userinfo_uri = "%s/connect/userinfo" % (request.session['resource_uri'])
userinfo = oas.get(userinfo_uri).json()
return JsonResponse(userinfo)
def test_coverage(request):
oas = OAuth2Session(
request.session['client_id'], token=request.session['token'])
coverage_uri = "%s/protected/bluebutton/fhir/v1/Coverage/?_format=json" % (
request.session['resource_uri'])
coverage = oas.get(coverage_uri).json()
return JsonResponse(coverage, safe=False)
def test_patient(request):
oas = OAuth2Session(
request.session['client_id'], token=request.session['token'])
patient_uri = "%s/protected/bluebutton/fhir/v1/Patient/%s?_format=json" % (
request.session['resource_uri'], request.session['patient'])
patient = oas.get(patient_uri).json()
return JsonResponse(patient)