def test_decode_raises_for_invalid_token_error(self):
with pytest.raises(werkzeug.exceptions.Unauthorized):
with mock.patch('jwt.decode', side_effect=jwt.InvalidTokenError):
self.auth.decode_token(None)
python类InvalidTokenError()的实例源码
def decode_token(self, token):
try:
decoded = jwt.decode(token, self.config['JWT_SECRET_KEY'],
algorithms=[self.config['JWT_ALGORITHM']], leeway=self.config.get('JWT_LEEWAY', 0))
except jwt.ExpiredSignatureError:
raise werkzeug.exceptions.Unauthorized("JWT Error: Token is expired.")
except jwt.DecodeError:
raise werkzeug.exceptions.Unauthorized("JWT Error: Token could not be decoded.")
except jwt.InvalidTokenError:
raise werkzeug.exceptions.Unauthorized("JWT Error: Token is invalid.")
return decoded
def test_decode_failure(self):
"""
Verifies the function logs decode failures, and raises an InvalidTokenError if the token cannot be decoded
"""
# Create tokens using each invalid issuer and attempt to decode them against
# the valid issuers list, which won't work
with mock.patch('edx_rest_framework_extensions.utils.logger') as patched_log:
with self.assertRaises(jwt.InvalidTokenError):
self.payload['iss'] = 'invalid-issuer'
signing_key = 'invalid-secret-key'
# Generate a token using the invalid issuer data
token = generate_jwt_token(self.payload, signing_key)
# Attempt to decode the token against the entries in the valid issuers list,
# which will fail with an InvalidTokenError
utils.jwt_decode_handler(token)
# Verify that the proper entries were written to the log file
msg = "Token decode failed for issuer 'test-issuer-1'"
patched_log.info.assert_any_call(msg, exc_info=True)
msg = "Token decode failed for issuer 'test-issuer-2'"
patched_log.info.assert_any_call(msg, exc_info=True)
msg = "All combinations of JWT issuers and secret keys failed to validate the token."
patched_log.error.assert_any_call(msg)
def decode_token(token):
"""Decode the access token from the Authorization header."""
try:
payload = jwt.decode(token, current_app.config.get('SECRET'))
return payload['sub']
except jwt.ExpiredSignatureError:
return "Expired token. Please log in to get a new token"
except jwt.InvalidTokenError:
return "Invalid token. Please register or login"
def _set_error_handler_callbacks(self, app):
"""
Sets the error handler callbacks used by this extension
"""
@app.errorhandler(NoAuthorizationError)
def handle_auth_error(e):
return self._unauthorized_callback(str(e))
@app.errorhandler(CSRFError)
def handle_auth_error(e):
return self._unauthorized_callback(str(e))
@app.errorhandler(ExpiredSignatureError)
def handle_expired_error(e):
return self._expired_token_callback()
@app.errorhandler(InvalidHeaderError)
def handle_invalid_header_error(e):
return self._invalid_token_callback(str(e))
@app.errorhandler(InvalidTokenError)
def handle_invalid_token_error(e):
return self._invalid_token_callback(str(e))
@app.errorhandler(JWTDecodeError)
def handle_jwt_decode_error(e):
return self._invalid_token_callback(str(e))
@app.errorhandler(WrongTokenError)
def handle_wrong_token_error(e):
return self._invalid_token_callback(str(e))
@app.errorhandler(RevokedTokenError)
def handle_revoked_token_error(e):
return self._revoked_token_callback()
@app.errorhandler(FreshTokenRequired)
def handle_fresh_token_required(e):
return self._needs_fresh_token_callback()
@app.errorhandler(UserLoadError)
def handler_user_load_error(e):
# The identity is already saved before this exception was raised,
# otherwise a different exception would be raised, which is why we
# can safely call get_jwt_identity() here
identity = get_jwt_identity()
return self._user_loader_error_callback(identity)
@app.errorhandler(UserClaimsVerificationError)
def handle_failed_user_claims_verification(e):
return self._claims_verification_failed_callback()
def verify(self, signer_requirements, path, sgn_value):
"""Verify the authenticity of the incoming data
:param signer_requirements: what we expect of the key; right now,
the CN of the signer
:param path: path where the JSON object was found
:param sgn_value: The datastructure found at that path
:return: confirmed original value from JWT token.
:raise A JWTSigningFailed is raised if the verification fails.
"""
if sgn_value is None:
raise JWTSigningFailed(
_("Invalid empty value at path %s") % (path))
try:
# Load the certificate and verify that it is both a suitable
# certificate for this key and one we trust the origin of
vcert_str = sgn_value.get("certificate", "")
# ("" is an invalid key)
# TODO(ijw): why?
vcert_str = vcert_str.encode('ascii', 'ignore')
# TODO(ijw): how does this fail?
vcert_obj = load_pem_x509_certificate(
vcert_str,
default_backend())
vpublic_key = vcert_obj.public_key()
self._check_node_name(signer_requirements, vcert_obj)
# TODO(ijw): what checks the cert is signed with the CA?
self._verify_certificate(vcert_str)
# Unpack the JWT to its raw data
jwtok = sgn_value.get("jwt", "")
# ("" is an invalid token)
dval = jwt.decode(jwtok, vpublic_key, algorithm='RS256')
# Check the ancillary tags of the raw data
self._check_path(dval, path)
# TODO(ijw): check delta
# Get and return the originally provided value
return dval["value"]
except jwt.InvalidTokenError:
raise JWTSigningFailed(_("InvalidTokenError: path :%s") % path)