python类OAuth2Session()的实例源码

oauth2.py 文件源码 项目:python-eduvpn-client 作者: eduvpn 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def create_oauth_session(port, auto_refresh_url):
    """
    Create a oauth2 callback webserver

    args:
        port (int): the port where to listen to
    returns:
        OAuth2Session: a oauth2 session object
    """
    logger.info("Creating an oauth session, temporarily starting webserver on port {} for auth callback".format(port))
    redirect_uri = 'http://127.0.0.1:%s/callback' % port
    oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, auto_refresh_url=auto_refresh_url, scope=scope)
    return oauth
oauth2.py 文件源码 项目:st2-auth-backend-oauth2 作者: pidah 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def authenticate(self, username, password):

        if username in self._exclude_list:
            htpasswd_file = HtpasswdFile(path=self._file_path)
            result = htpasswd_file.check_password(username, password)

            if result is None:
                LOG.info('User "%s" doesn\'t exist' % (username))
            elif result is False:
                LOG.info('Invalid password for user "%s"' % (username))
            elif result is True:
                LOG.info(
                    'Authentication for user "%s" successful' %
                    (username))

            return bool(result)

        os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
        oauth = OAuth2Session(
            client=LegacyApplicationClient(
                client_id=self._client_id))

        try:
            token = oauth.fetch_token(
                token_url=self._token_url,
                username=username,
                password=password,
                client_id=self._client_id,
                client_secret=self._client_secret,
                verify=False)
        except Exception as e:
            LOG.info(
                'Authentication for user "{}" failed: {}'.format(
                    username, str(e)))
            return False
        if token is not None:
            LOG.info('Authentication for user "{}" successful'.format(username))
            return True

        return False
facebook.py 文件源码 项目:django-authlib 作者: matthiask 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, request):
        self._request = request
        self._session = OAuth2Session(
            self.client_id,
            scope=self.scope,
            redirect_uri=request.build_absolute_uri('.'),
        )
google.py 文件源码 项目:django-authlib 作者: matthiask 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, request):
        self._request = request
        self._session = OAuth2Session(
            self.client_id,
            scope=self.scope,
            redirect_uri=request.build_absolute_uri('.'),
        )
api.py 文件源码 项目:MMM-fitbit 作者: SVendittelli 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def fetch_access_token(self, code, redirect_uri):

        """Step 2: Given the code from fitbit from step 1, call
        fitbit again and returns an access token object. Extract the needed
        information from that and save it to use in future API calls.
        the token is internally saved
        """
        auth = OAuth2Session(self.client_id, redirect_uri=redirect_uri)
        self.token = auth.fetch_token(
            self.access_token_url,
            username=self.client_id,
            password=self.client_secret,
            code=code)

        return self.token
oauth.py 文件源码 项目:doorman 作者: mwielgoszewski 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_authorize_url(self):
        provider = OAuth2Session(self.client_id,
                                 scope=self.scope,
                                 redirect_uri=self.redirect_uri)

        authorization_url, state = provider.authorization_url(
            self.base_url,
            access_type='online',
            approval_prompt='force',
        )

        session['_oauth_state'] = state

        return authorization_url
monzo_api.py 文件源码 项目:pymonzo 作者: pawelad 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_response(self, method, endpoint, params=None):
        """
        Helper method to handle HTTP requests and catch API errors

        :param method: valid HTTP method
        :type method: str
        :param endpoint: API endpoint
        :type endpoint: str
        :param params: extra parameters passed with the request
        :type params: dict
        :returns: API response
        :rtype: Response
        """
        url = urljoin(self.api_url, endpoint)

        try:
            response = getattr(self._session, method)(url, params=params)

            # Check if Monzo API returned HTTP 401, which could mean that the
            # token is expired
            if response.status_code == 401:
                raise TokenExpiredError

        except TokenExpiredError:
            # For some reason 'requests-oauthlib' automatic token refreshing
            # doesn't work so we do it here semi-manually
            self._refresh_oath_token()

            self._session = OAuth2Session(
                client_id=self._client_id,
                token=self._token,
            )

            response = getattr(self._session, method)(url, params=params)

        if response.status_code != requests.codes.ok:
            raise MonzoAPIError(
                "Something went wrong: {}".format(response.json())
            )

        return response
views.py 文件源码 项目:django-discord-bind 作者: mrogaski 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def oauth_session(request, state=None, token=None):
    """ Constructs the OAuth2 session object. """
    if settings.DISCORD_REDIRECT_URI is not None:
        redirect_uri = settings.DISCORD_REDIRECT_URI
    else:
        redirect_uri = request.build_absolute_uri(
            reverse('discord_bind_callback'))
    scope = (['email', 'guilds.join'] if settings.DISCORD_EMAIL_SCOPE
             else ['identity', 'guilds.join'])
    return OAuth2Session(settings.DISCORD_CLIENT_ID,
                         redirect_uri=redirect_uri,
                         scope=scope,
                         token=token,
                         state=state)
managers.py 文件源码 项目:adarnauth-esi 作者: Adarnof 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def bulk_refresh(self):
        """
        Refreshes all refreshable tokens in the queryset.
        Deletes any tokens which fail to refresh.
        Deletes any tokens which are expired and cannot refresh.
        """
        session = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID)
        auth = requests.auth.HTTPBasicAuth(app_settings.ESI_SSO_CLIENT_ID, app_settings.ESI_SSO_CLIENT_SECRET)
        for model in self.filter(refresh_token__isnull=False):
            try:
                model.refresh(session=session, auth=auth)
            except TokenError:
                model.delete()
        self.filter(refresh_token__isnull=True).get_expired().delete()
views.py 文件源码 项目:adarnauth-esi 作者: Adarnof 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def sso_redirect(request, scopes=list([]), return_to=None):
    """
    Generates a :model:`esi.CallbackRedirect` for the specified request.
    Redirects to EVE for login.
    Accepts a view or URL name as a redirect after SSO.
    """
    if isinstance(scopes, string_types):
        scopes = list([scopes])

    # ensure only one callback redirect model per session
    CallbackRedirect.objects.filter(session_key=request.session.session_key).delete()

    # ensure session installed in database
    if not request.session.exists(request.session.session_key):
        request.session.create()

    if return_to:
        url = reverse(return_to)
    else:
        url = request.get_full_path()

    oauth = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID, redirect_uri=app_settings.ESI_SSO_CALLBACK_URL, scope=scopes)
    redirect_url, state = oauth.authorization_url(app_settings.ESI_OAUTH_LOGIN_URL)

    CallbackRedirect.objects.create(session_key=request.session.session_key, state=state, url=url)

    return redirect(redirect_url)
models.py 文件源码 项目:adarnauth-esi 作者: Adarnof 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_token_data(cls, access_token):
        session = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID, token={'access_token': access_token})
        return session.request('get', app_settings.ESI_TOKEN_VERIFY_URL).json()
api.py 文件源码 项目:Fitbit 作者: jaromirsalamon 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def fetch_access_token(self, code, redirect_uri):

        """Step 2: Given the code from fitbit from step 1, call
        fitbit again and returns an access token object. Extract the needed
        information from that and save it to use in future API calls.
        the token is internally saved
        """
        auth = OAuth2Session(self.client_id, redirect_uri=redirect_uri)
        self.token = auth.fetch_token(
            self.access_token_url,
            username=self.client_id,
            password=self.client_secret,
            code=code)

        return self.token
api.py 文件源码 项目:python-mautic 作者: divio 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(
        self,
        base_url,
        client_id,
        client_secret=None,
        scope=None,
        token=None,
        token_updater=None
    ):
        """
        :param base_url: str Base URL for Mautic API E.g. `https://<your-domain>.mautic.net`
        :param client_id: str Mautic API Public Key
        :param client_secret: str Mautic API Secret Key - needed to autorefresh token
        :param scope: list|str
        :param token: dict with access token data
        :param token_updater: function used for token autorefresh.
        """
        if scope and not isinstance(scope, (list, tuple)):
            scope = scope.split(',')
        self.base_url = base_url.strip(' /')

        self.access_token_url = base_url + '/oauth/v2/token'
        self.authorization_base_url = base_url + '/oauth/v2/authorize'

        if token_updater is not None and client_secret is not None:
            kwargs = {
                'auto_refresh_url': self.access_token_url,
                'auto_refresh_kwargs': {
                    'client_id': client_id,
                    'client_secret': client_secret
                },
                'token_updater': token_updater
            }
        else:
            kwargs = {}

        self.session = OAuth2Session(
            client_id, scope=scope, token=token, **kwargs
        )
oauth2_app.py 文件源码 项目:python-mautic 作者: divio 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def index():
    """Step 1: User Authorization.

    Redirect the user/resource owner to the OAuth provider using an URL with a few key OAuth parameters.
    """
    mautic = OAuth2Session(client_id, redirect_uri=redirect_uri)
    authorization_url, state = mautic.authorization_url(authorization_base_url, grant_type='authorization_code')

    session['oauth_state'] = state
    return redirect(authorization_url)


# Step 2: User authorization, this happens on the provider.
test_orcid.py 文件源码 项目:NZ-ORCID-Hub 作者: Royal-Society-of-New-Zealand 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def make_fake_response(text, *args, **kwargs):
    """Mock out the response object returned by requests_oauthlib.OAuth2Session.get(...)."""
    mm = MagicMock(name="response")
    mm.text = text
    if "json" in kwargs:
        mm.json.return_value = kwargs["json"]
    else:
        mm.json.return_value = json.loads(text)
    if "status_code" in kwargs:
        mm.status_code = kwargs["status_code"]
    return mm
authcontroller.py 文件源码 项目:NZ-ORCID-Hub 作者: Royal-Society-of-New-Zealand 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def profile():
    """Fetch a protected resource using an OAuth 2 token."""
    user = current_user
    app.logger.info(f"For {user} trying to display profile by getting ORCID token")
    try:
        orcid_token = OrcidToken.get(user_id=user.id, org=user.organisation)
    except OrcidToken.DoesNotExist:
        app.logger.info(f"For {user} we dont have ocrditoken so redirecting back to link page")
        return redirect(url_for("link"))

    except Exception as ex:
        # TODO: need to handle this
        app.logger.exception("Failed to retrieve ORCID token form DB.")
        flash("Unhandled Exception occured: %s" % ex, "danger")
        return redirect(url_for("login"))
    else:
        client = OAuth2Session(
            user.organisation.orcid_client_id, token={
                "access_token": orcid_token.access_token
            })
        base_url = ORCID_API_BASE + user.orcid
        # TODO: utilize asyncio/aiohttp to run it concurrently
        resp_person = client.get(base_url + "/person", headers=HEADERS)
        app.logger.info("For %r logging response code %r, While fetching profile info", user,
                        resp_person.status_code)
        if resp_person.status_code == 401:
            orcid_token.delete_instance()
            app.logger.info("%r has removed his organisation from trusted list", user)
            return redirect(url_for("link"))
        else:
            users = User.select().where(User.orcid == user.orcid)
            users_orcid = OrcidToken.select().where(OrcidToken.user.in_(users))
            app.logger.info("For %r everything is fine, So displaying profile page", user)
            return render_template(
                "profile.html", user=user, users_orcid=users_orcid, profile_url=ORCID_BASE_URL)
views.py 文件源码 项目:bluebutton-web-server 作者: CMSgov 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def callback(request):

    response = OrderedDict()

    oas = OAuth2Session(request.session['client_id'],
                        redirect_uri=request.session['redirect_uri'])

    host = settings.HOSTNAME_URL

    if not(host.startswith("http://") or host.startswith("https://")):
        host = "https://%s" % (host)
    auth_uri = host + request.get_full_path()
    token = oas.fetch_token(request.session['token_uri'],
                            client_secret=request.session['client_secret'],
                            authorization_response=auth_uri)
    request.session['token'] = token
    response['token_response'] = OrderedDict()

    for k, v in token.items():
        if k != "scope":
            response['token_response'][k] = v
        else:
            response['token_response'][k] = ' '.join(v)

    userinfo = oas.get(request.session['userinfo_uri']).json()
    response[request.session['userinfo_uri']] = userinfo
    request.session['patient'] = userinfo['patient']

    response['oidc_discovery_uri'] = host + \
        reverse('openid-configuration')

    response['fhir_metadata_uri'] = host + \
        reverse('fhir_conformance_metadata')

    response['test_page'] = host + reverse('test_links')

    return JsonResponse(response)
views.py 文件源码 项目:bluebutton-web-server 作者: CMSgov 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_userinfo(request):
    oas = OAuth2Session(
        request.session['client_id'], token=request.session['token'])
    userinfo_uri = "%s/connect/userinfo" % (request.session['resource_uri'])
    userinfo = oas.get(userinfo_uri).json()
    return JsonResponse(userinfo)
views.py 文件源码 项目:bluebutton-web-server 作者: CMSgov 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_coverage(request):
    oas = OAuth2Session(
        request.session['client_id'], token=request.session['token'])
    coverage_uri = "%s/protected/bluebutton/fhir/v1/Coverage/?_format=json" % (
        request.session['resource_uri'])

    coverage = oas.get(coverage_uri).json()
    return JsonResponse(coverage, safe=False)
views.py 文件源码 项目:bluebutton-web-server 作者: CMSgov 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_patient(request):
    oas = OAuth2Session(
        request.session['client_id'], token=request.session['token'])
    patient_uri = "%s/protected/bluebutton/fhir/v1/Patient/%s?_format=json" % (
        request.session['resource_uri'], request.session['patient'])
    patient = oas.get(patient_uri).json()
    return JsonResponse(patient)


问题


面经


文章

微信
公众号

扫码关注公众号