python类InvalidTokenError()的实例源码

test_auth.py 文件源码 项目:stethoscope 作者: Netflix 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_decode_raises_for_invalid_token_error(self):
    with pytest.raises(werkzeug.exceptions.Unauthorized):
      with mock.patch('jwt.decode', side_effect=jwt.InvalidTokenError):
        self.auth.decode_token(None)
auth.py 文件源码 项目:stethoscope 作者: Netflix 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def decode_token(self, token):
    try:
      decoded = jwt.decode(token, self.config['JWT_SECRET_KEY'],
          algorithms=[self.config['JWT_ALGORITHM']], leeway=self.config.get('JWT_LEEWAY', 0))
    except jwt.ExpiredSignatureError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token is expired.")
    except jwt.DecodeError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token could not be decoded.")
    except jwt.InvalidTokenError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token is invalid.")
    return decoded
test_utils.py 文件源码 项目:edx-drf-extensions 作者: edx 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_decode_failure(self):
        """
        Verifies the function logs decode failures, and raises an InvalidTokenError if the token cannot be decoded
        """

        # Create tokens using each invalid issuer and attempt to decode them against
        # the valid issuers list, which won't work
        with mock.patch('edx_rest_framework_extensions.utils.logger') as patched_log:
            with self.assertRaises(jwt.InvalidTokenError):
                self.payload['iss'] = 'invalid-issuer'
                signing_key = 'invalid-secret-key'
                # Generate a token using the invalid issuer data
                token = generate_jwt_token(self.payload, signing_key)
                # Attempt to decode the token against the entries in the valid issuers list,
                # which will fail with an InvalidTokenError
                utils.jwt_decode_handler(token)

            # Verify that the proper entries were written to the log file
            msg = "Token decode failed for issuer 'test-issuer-1'"
            patched_log.info.assert_any_call(msg, exc_info=True)

            msg = "Token decode failed for issuer 'test-issuer-2'"
            patched_log.info.assert_any_call(msg, exc_info=True)

            msg = "All combinations of JWT issuers and secret keys failed to validate the token."
            patched_log.error.assert_any_call(msg)
models.py 文件源码 项目:flask-rest-api 作者: gitgik 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def decode_token(token):
        """Decode the access token from the Authorization header."""
        try:
            payload = jwt.decode(token, current_app.config.get('SECRET'))
            return payload['sub']
        except jwt.ExpiredSignatureError:
            return "Expired token. Please log in to get a new token"
        except jwt.InvalidTokenError:
            return "Invalid token. Please register or login"
jwt_manager.py 文件源码 项目:flask-jwt-extended 作者: vimalloc 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _set_error_handler_callbacks(self, app):
        """
        Sets the error handler callbacks used by this extension
        """
        @app.errorhandler(NoAuthorizationError)
        def handle_auth_error(e):
            return self._unauthorized_callback(str(e))

        @app.errorhandler(CSRFError)
        def handle_auth_error(e):
            return self._unauthorized_callback(str(e))

        @app.errorhandler(ExpiredSignatureError)
        def handle_expired_error(e):
            return self._expired_token_callback()

        @app.errorhandler(InvalidHeaderError)
        def handle_invalid_header_error(e):
            return self._invalid_token_callback(str(e))

        @app.errorhandler(InvalidTokenError)
        def handle_invalid_token_error(e):
            return self._invalid_token_callback(str(e))

        @app.errorhandler(JWTDecodeError)
        def handle_jwt_decode_error(e):
            return self._invalid_token_callback(str(e))

        @app.errorhandler(WrongTokenError)
        def handle_wrong_token_error(e):
            return self._invalid_token_callback(str(e))

        @app.errorhandler(RevokedTokenError)
        def handle_revoked_token_error(e):
            return self._revoked_token_callback()

        @app.errorhandler(FreshTokenRequired)
        def handle_fresh_token_required(e):
            return self._needs_fresh_token_callback()

        @app.errorhandler(UserLoadError)
        def handler_user_load_error(e):
            # The identity is already saved before this exception was raised,
            # otherwise a different exception would be raised, which is why we
            # can safely call get_jwt_identity() here
            identity = get_jwt_identity()
            return self._user_loader_error_callback(identity)

        @app.errorhandler(UserClaimsVerificationError)
        def handle_failed_user_claims_verification(e):
            return self._claims_verification_failed_callback()
jwt_agent.py 文件源码 项目:networking-vpp 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def verify(self, signer_requirements, path, sgn_value):
        """Verify the authenticity of the incoming data

        :param signer_requirements: what we expect of the key; right now,
        the CN of the signer
        :param path: path where the JSON object was found
        :param sgn_value: The datastructure found at that path
        :return: confirmed original value from JWT token.
        :raise A JWTSigningFailed is raised if the verification fails.
        """

        if sgn_value is None:
            raise JWTSigningFailed(
                _("Invalid empty value at path %s") % (path))

        try:
            # Load the certificate and verify that it is both a suitable
            # certificate for this key and one we trust the origin of
            vcert_str = sgn_value.get("certificate", "")
            # ("" is an invalid key)

            # TODO(ijw): why?
            vcert_str = vcert_str.encode('ascii', 'ignore')
            # TODO(ijw): how does this fail?
            vcert_obj = load_pem_x509_certificate(
                vcert_str,
                default_backend())

            vpublic_key = vcert_obj.public_key()

            self._check_node_name(signer_requirements, vcert_obj)
            # TODO(ijw): what checks the cert is signed with the CA?
            self._verify_certificate(vcert_str)

            # Unpack the JWT to its raw data
            jwtok = sgn_value.get("jwt", "")
            # ("" is an invalid token)
            dval = jwt.decode(jwtok, vpublic_key, algorithm='RS256')

            # Check the ancillary tags of the raw data
            self._check_path(dval, path)
            # TODO(ijw): check delta

            # Get and return the originally provided value
            return dval["value"]

        except jwt.InvalidTokenError:
            raise JWTSigningFailed(_("InvalidTokenError: path :%s") % path)


问题


面经


文章

微信
公众号

扫码关注公众号