python类DecodeError()的实例源码

GeneralController.py 文件源码 项目:cerberus-core 作者: ovh 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def logout(request):
    """ Logout a user
    """
    try:
        token = request.environ['HTTP_X_API_TOKEN']
    except (KeyError, IndexError, TypeError):
        raise BadRequest('Missing HTTP X-Api-Token header')

    try:
        data = jwt.decode(token, settings.SECRET_KEY)
        data = json.loads(CRYPTO.decrypt(str(data['data'])))
        user = User.objects.get(id=data['id'])
        user.last_login = datetime.fromtimestamp(0)
        user.save()
        return {'message': 'Logged out'}
    except (utils.CryptoException, KeyError, jwt.DecodeError,
            jwt.ExpiredSignature, User.DoesNotExist):
        raise BadRequest('Invalid token')
tenant_manager.py 文件源码 项目:os-reststack-manager 作者: gemagomez 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def authenticate():
    # logger.debug("endpoint request: %s" % request.endpoint)
    if re.search('tenant_provisioned', str(request.endpoint)):
        g.user = "phone_home"
        logger.info("Authentication bypassed: tenant_provisioned")
        return

    try:
        decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256'])
        g.user = decoded['user']
    except KeyError:
        logger.error("Error: key error.")
        abort(401)
    except jwt.DecodeError:
        logger.error("Error: decode error")
        abort(401)
auth.py 文件源码 项目:django-redis-pubsub 作者: andrewyoung1991 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def authjwt_method(token):
    """ an authentication method using rest_framework_jwt
    """
    import jwt
    from rest_framework_jwt.authentication import (jwt_decode_handler,
                                                    jwt_get_username_from_payload)
    try:
        payload = jwt_decode_handler(token)
    except (jwt.ExpiredSignature, jwt.DecodeError, jwt.InvalidTokenError):
        return None

    User = get_user_model()
    username = jwt_get_username_from_payload(payload)
    if not username:  # pragma: no cover
        return None

    try:
        user = User.objects.get_by_natural_key(username)
    except User.DoesNotExist:  # pragma: no cover
        return None

    return user
jwt_authenticator.py 文件源码 项目:django-open-volunteering-platform 作者: OpenVolunteeringPlatform 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def authenticate(self, request):
    """
    Returns a two-tuple of `User` and token if a valid signature has been
    supplied using JWT-based authentication.  Otherwise returns `None`.
    """
    jwt_value = self.get_jwt_value(request)
    if jwt_value is None:
      return None

    try:
      payload = jwt_decode_handler(jwt_value)
    except jwt.ExpiredSignature:
      msg = _('Signature has expired.')
      raise exceptions.AuthenticationFailed(msg)
    except jwt.DecodeError:
      msg = _('Error decoding signature.')
      raise exceptions.AuthenticationFailed(msg)
    except jwt.InvalidTokenError:
      raise exceptions.AuthenticationFailed()

    user = self.authenticate_credentials(payload, request.channel)

    return (user, jwt_value)
auth.py 文件源码 项目:nanosphere 作者: walkr 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def decode_jwt(token, options=None):
    """ Authenticate via JSON Web Token

    Options: `secret`, `algorithms` """

    options = options or {}
    secret = options.get('secret', CONF and CONF.secret)
    algorithms = options.get('algorithms', CONF and CONF.algorithms)

    try:
        data = jwt.decode(token, secret, algorithms)
    except jwt.DecodeError:
        raise Exception('Cannot decode token')
    else:
        logging.debug('* Decoded data = {}'.format(data))
        return data
decorators.py 文件源码 项目:auction-backend 作者: luissalgadofreire 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def authenticate(token):
    """
    Tries to authenticate user based on the supplied token. It also checks
    the token structure and validity.

    Based on jwt_auth.JSONWebTokenAuthMixin.authenticate
    """
    try:
        payload = jwt_decode_handler(token)
    except jwt.ExpiredSignature:
        msg = 'Signature has expired.'
        raise exceptions.AuthenticationFailed(msg)
    except jwt.DecodeError:
        msg = 'Error decoding signature.'
        raise exceptions.AuthenticationFailed(msg)

    user = authenticate_credentials(payload)

    return user
security.py 文件源码 项目:pivportal 作者: starboarder2001 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def token_required(secret_key):
    def token_required_decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            g = f.func_globals

            if not request.headers.get('Authorization'):
                return Response(response="Missing authorization header", status=401)
            try:
                payload = parse_token(request.headers.get('Authorization').split()[1], secret_key)
            except jwt.DecodeError:
                return Response(response="Token is invalid", status=401)
            except jwt.ExpiredSignature:
                return Response(response="Token has expired", status=401)

            # Set username for decorated func
            g["username"] = payload['sub']

            return f(*args, **kwargs)
        return decorated_function
    return token_required_decorator
websocket_server.py 文件源码 项目:baselayer 作者: cesium-ml 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def authenticate(self, auth_token):
        try:
            token_payload = jwt.decode(auth_token, secret)
            username = token_payload['username']

            self.username = username
            self.authenticated = True
            self.auth_failures = 0
            self.send_json(action='AUTH OK')

            # If we are the first websocket connecting on behalf of
            # a given user, subscribe to the feed for that user
            if len(WebSocket.sockets[username]) == 0:
                WebSocket.subscribe(username)

            WebSocket.sockets[username].add(self)

        except jwt.DecodeError:
            self.send_json(action='AUTH FAILED')
        except jwt.ExpiredSignatureError:
            self.send_json(action='AUTH FAILED')
authentication.py 文件源码 项目:drf-jwt-devices 作者: ArabellaTech 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def authenticate(self, request):
        jwt_value = self.get_jwt_value(request)
        if jwt_value is None:
            return None

        try:
            if api_settings.JWT_PERMANENT_TOKEN_AUTH:
                payload = jwt_devices_decode_handler(jwt_value)
            else:
                payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _("Signature has expired.")
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _("Error decoding signature.")
            raise exceptions.AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise exceptions.AuthenticationFailed()

        user = self.authenticate_credentials(payload)

        return user, jwt_value
azuread_tenant.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_certificate(self, kid):
        # retrieve keys from jwks_url
        resp = self.request(self.jwks_url(), method='GET')
        resp.raise_for_status()

        # find the proper key for the kid
        for key in resp.json()['keys']:
            if key['kid'] == kid:
                x5c = key['x5c'][0]
                break
        else:
            raise DecodeError('Cannot find kid={}'.format(kid))

        certificate = '-----BEGIN CERTIFICATE-----\n' \
                      '{}\n' \
                      '-----END CERTIFICATE-----'.format(x5c)

        return load_pem_x509_certificate(certificate.encode(),
                                         default_backend())
azuread_tenant.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def user_data(self, access_token, *args, **kwargs):
        response = kwargs.get('response')
        id_token = response.get('id_token')

        # decode the JWT header as JSON dict
        jwt_header = json.loads(
            base64.b64decode(id_token.split('.', 1)[0]).decode()
        )

        # get key id and algorithm
        key_id = jwt_header['kid']
        algorithm = jwt_header['alg']

        try:
            # retrieve certificate for key_id
            certificate = self.get_certificate(key_id)

            return jwt_decode(
                id_token,
                key=certificate.public_key(),
                algorithms=algorithm,
                audience=self.setting('SOCIAL_AUTH_AZUREAD_OAUTH2_KEY')
            )
        except (DecodeError, ExpiredSignature) as error:
            raise AuthTokenError(self, error)
web_token.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def decode_token(token, secret='', ttl=DEFAULT_TTL, verify=True):
    try:
        token = jwt.decode(str(token), secret, verify=verify)
    except jwt.DecodeError as e:
        raise TokenInvalid("error decoding JSON Web Token", e)

    if verify:
        issue_time = token.get('issuedAt')
        if issue_time is None:
            raise TokenInvalid("'issuedAt' is missing from token")

        issue_time = isodate.parse_datetime(issue_time)
        expiry_time = issue_time + datetime.timedelta(seconds=ttl)

        if issue_time > _now():
            raise TokenInvalid("token is not yet valid")
        if expiry_time < _now():
            raise TokenInvalid("token has expired")

    return token
middlewares.py 文件源码 项目:IntegraTI-API 作者: discentes-imd 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def verify_token():
    """
    Verify if the token is valid, not expired and not blacklisted
    """
    if 'Authorization' in request.headers:
        if request.headers['Authorization'] in cache.blacklisted_tokens:
            abort(403, 'Error: invalid token')
        try:
            payload = jwt.decode(request.headers['Authorization'], config.SECRET_KEY)
            g.current_user = payload['id_user']
        except jwt.ExpiredSignatureError:
            abort(403, 'Error: token expired')
        except jwt.DecodeError:
            abort(403, 'Error: invalid token')
auth.py 文件源码 项目:xiaodi 作者: shenaishiren 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def verify_token(cls, user, token):
        try:
            data = jwt.decode(token, str(user.id) + SECRET_CODE)
        except (jwt.ExpiredSignatureError, jwt.DecodeError, AttributeError):
            return False
        else:
            if data["username"] == user.username:
                return True
            else:
                return False
auth.py 文件源码 项目:arc 作者: lap00zza 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def decode_token(self, token):
        try:
            return jwt.decode(token, key=self.secret, algorithms=self.algorithm)
        except jwt.DecodeError:
            return False
azure_ad.py 文件源码 项目:vote4code 作者: welovecoding 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def azure_ad_authorized():
  response = azure_ad.authorized_response()
  print response
  if response is None:
    flask.flash('You denied the request to sign in.')
    return flask.redirect(util.get_next_url)
  id_token = response['id_token']
  flask.session['oauth_token'] = (id_token, '')
  try:
    decoded_id_token = jwt.decode(id_token, verify=False)
  except (jwt.DecodeError, jwt.ExpiredSignature):
    flask.flash('You denied the request to sign in.')
    return flask.redirect(util.get_next_url)
  user_db = retrieve_user_from_azure_ad(decoded_id_token)
  return auth.signin_user_db(user_db)
token.py 文件源码 项目:auth-srv 作者: openpermissions 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def decode_token(token):
    """
    Get the organisation ID from a token

    :param token: a JSON Web Token
    :returns: str, organisation ID
    :raises:
        jwt.DecodeError: Invalid token or not signed with our key
        jwt.ExpiredSignatureError: Token has expired
        jwt.InvalidAudienceError: Invalid "aud" claim
        jwt.InvalidIssuerError: Invalid "iss" claim
        jwt.MissingRequiredClaimError: Missing a required claim
    """
    cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT
    with open(cert_file) as f:
        cert = load_pem_x509_certificate(f.read(), default_backend())
        public_key = cert.public_key()

    payload = jwt.decode(token,
                         public_key,
                         audience=audience(),
                         issuer=issuer(),
                         algorithms=[ALGORITHM],
                         verify=True)

    if not payload.get('sub'):
        raise jwt.MissingRequiredClaimError('"sub" claim is required')

    payload['scope'] = Scope(payload['scope'])

    return payload
test_token.py 文件源码 项目:auth-srv 作者: openpermissions 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_decode_token_different_public_key():
    token, expiry = generate_token(CLIENT, SCOPE, 'grant_type')

    with patch.object(_token, 'LOCALHOST_CRT', koi.CLIENT_CRT):
        with pytest.raises(jwt.DecodeError):
            decode_token(token)
test_grant.py 文件源码 项目:auth-srv 作者: openpermissions 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_invalid_assertion(self):
        self.request.body_arguments['assertion'] = ['invalid_token']
        grant = grants.AuthorizeDelegate(self.request)

        with pytest.raises(jwt.DecodeError):
            yield grant.generate_token()
mixins.py 文件源码 项目:graphene-jwt-auth 作者: darwin4031 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def authenticate(self, request):
        auth = get_authorization_header(request).split()
        auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()

        if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
            raise exceptions.AuthenticationFailed()

        if len(auth) == 1:
            msg = _("Invalid Authorization header. No credentials provided.")
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = _("Invalid Authorization header. Credentials string should not contain spaces.")
            raise exceptions.AuthenticationFailed(msg)

        try:
            payload = jwt_decode_handler(auth[1])
        except jwt.ExpiredSignature:
            msg = _("Signature has expired.")
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _("Error decoding signature.")
            raise exceptions.AuthenticationFailed(msg)

        user = self.authenticate_credentials(payload)

        return (user, auth[1])
authentication.py 文件源码 项目:graphene-jwt-auth 作者: darwin4031 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def authenticate(self, request):
        jwt_value = self.get_jwt_value(request)

        if jwt_value is None:
            return None, None

        try:
            payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _("Signature has expired.")
            raise AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _("Error decoding signature.")
            raise AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise AuthenticationFailed()

        # Check blacklist
        self.check_blacklist(payload)

        user = self.authenticate_credentials(payload)

        # Check if password already change invalidated all old token
        self.check_changed_password_invalidated_old_token(user, payload)

        return user, jwt_value
forms.py 文件源码 项目:graphene-jwt-auth 作者: darwin4031 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _check_payload(token):
        # Check payload valid
        try:
            payload = jwt_decode_handler(token)
        except jwt.ExpiredSignature:
            msg = _("Signature has expired.")
            raise forms.ValidationError(msg)
        except jwt.DecodeError:
            msg = _("Error decoding signature.")
            raise forms.ValidationError(msg)

        return payload
auth.py 文件源码 项目:varapp-backend-py 作者: varapp 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def verify_jwt(auth_header, secret):
    """Extract the jwt token from the header, verify its signature,
       its expiration time, and return the payload."""
    if not auth_header or auth_header == 'null':
        logging.warning("No Authorization header")
        return [None, "Unauthorized access: missing authentication"]
    method,token = auth_header.split()   # separate 'JWT' from the jwt itself
    token = bytes(token, 'utf-8')
    try:
        payload = jwt.decode(token, secret, algorithms=['HS256'], verify=True)
    except (jwt.ExpiredSignatureError, jwt.DecodeError) as err:
        return [None, str(err)]
    return [payload, '']
auth.py 文件源码 项目:drf-jwt-knox 作者: ssaavedra 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_jwt_value(self, request):
        auth = get_authorization_header(request).split()
        auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()

        if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
            return None

        if len(auth) == 1:
            msg = _('Invalid Authorization header. No credentials provided.')
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = _('Invalid Authorization header. Credentials string '
                    'should contain no spaces.')
            raise exceptions.AuthenticationFailed(msg)

        jwt_value = auth[1]

        try:
            payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _('Signature has expired.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _('Error decoding signature.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise exceptions.AuthenticationFailed()

        return payload
test_create_scitoken.py 文件源码 项目:scitokens 作者: scitokens 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_public_key(self):
        """
        Test when the public key is provided to deserialize
        """

        token = scitokens.SciToken(key = self._private_key)
        serialized_token = token.serialize(issuer = "local")

        new_token = scitokens.SciToken.deserialize(serialized_token, public_key = self._public_pem, insecure = True)
        self.assertIsInstance(new_token, scitokens.SciToken)

        # With invalid key
        with self.assertRaises(ValueError):
            scitokens.SciToken.deserialize(serialized_token, insecure=True, public_key = "asdf".encode())

        # With a proper key, but not the right one
        private_key = generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend()
        )
        public_key = private_key.public_key()
        pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        with self.assertRaises(DecodeError):
            scitokens.SciToken.deserialize(serialized_token, insecure=True, public_key = pem)
socketioService.py 文件源码 项目:OctoPrint-Dashboard 作者: meadowfrey 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def on_join(data):
    if current_app.config["AUTH"] == Config.NONE:
        user = User("Gandalf", superadmin=True)
    else:
        token = data.get('jwt')
        if not token:
            disconnect()
            return

        try:
            payload = LoginService.parse_api_token_direct(token)
        except DecodeError:
            disconnect()
            return
        except ExpiredSignature:
            disconnect()
            return
        user = User.query.filter_by(username=payload["username"]).scalar()
    printers = user.get_accessible_printers()

    for printer in printers:
        join_room(str(printer.id))

    datatype = {
        'id': fields.Integer,
        'name': fields.String,
        'group': fields.List(
            fields.Nested({
                'name': fields.String
            })
        )
    }
    emit("printers", marshal(printers, datatype))
decorators.py 文件源码 项目:OctoPrint-Dashboard 作者: meadowfrey 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def login_required(f):
    """
    Decorator function for routes
    Checks Authorization header, token validity and injects user into flask global variable g
    """

    @wraps(f)
    def decorated_function(*args, **kwargs):
        if current_app.config["AUTH"] == Config.NONE:
            g.user = User("Gandalf", superadmin=True)
            return f(*args, **kwargs)

        if not request.headers.get('Authorization'):
            return "Missing authorization header", 401

        try:
            payload = LoginService.parse_api_token(request)
        except DecodeError:
            return 'Token is invalid', 401
        except ExpiredSignature:
            return 'Token has expired', 401
        g.user = User.query.filter_by(username=payload['username']).first()
        return f(*args, **kwargs)

    return decorated_function
decorators.py 文件源码 项目:OctoPrint-Dashboard 作者: meadowfrey 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def superadmin_required(f):
    """
    Decorator function for routes
    Checks Authorization header, token validity, superadmin permission and injects user into flask global variable g
    """

    @wraps(f)
    def decorated_function(*args, **kwargs):
        if current_app.config["AUTH"] == Config.NONE:
            g.user = User("Gandalf", superadmin=True)
            return f(*args, **kwargs)

        if not request.headers.get('Authorization'):
            return "Missing authorization header", 401

        try:
            payload = LoginService.parse_api_token(request)
        except DecodeError:
            return 'Token is invalid', 401
        except ExpiredSignature:
            return 'Token has expired', 401

        g.user = User.query.filter_by(username=payload['username']).first()
        if g.user.superadmin is False:
            return 'You are not superadmin', 401

        return f(*args, **kwargs)

    return decorated_function
jwt.py 文件源码 项目:paws 作者: funkybob 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def dispatch(self, request, **kwargs):
        hdr = request.headers.get('Authorization', '')
        if not hdr.startswith('Bearer '):
            return self.handle_no_token(request, **kwargs)

        bearer = hdr.split(' ', 1)[1].strip()
        try:
            request.jwt = jwt.decode(bearer, **self.get_jwt_decode_kwargs(request, **kwargs))
        except jwt.DecodeError as err:
            log.info('Invalid token: %s', err)
            return self.handle_invalid_token(request, err, **kwargs)

        self.handle_valid_token(request, **kwargs)

        return super().dispatch(request, **kwargs)
token.py 文件源码 项目:congredi 作者: toxik-io 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check(self, json):
        """Checking a JWT against passphrase and expiry"""
        try:
            payload = jwt.decode(json, self.secret, algorithms=['HS256'])
            return payload['pgp'], True
        # something has gone wrong
        except jwt.DecodeError:  # test
            return "Invalid Token", False
        except jwt.ExpiredSignature:  # test
            return "Expired Token", False


问题


面经


文章

微信
公众号

扫码关注公众号