def decode_auth_token(auth_token):
"""
Validates the auth token
:param auth_token:
:return: integer|string
"""
try:
payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
if is_blacklisted_token:
return 'Token blacklisted. Please log in again.'
else:
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
python类ExpiredSignatureError()的实例源码
def decode_auth_token(token):
"""
Decoding the token to get the payload and then return the user Id in 'sub'
:param token: Auth Token
:return:
"""
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms='HS256')
is_token_blacklisted = BlackListToken.check_blacklist(token)
if is_token_blacklisted:
return 'Token was Blacklisted, Please login In'
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired, Please sign in again'
except jwt.InvalidTokenError:
return 'Invalid token. Please sign in again'
def authenticate(self, auth_token):
try:
token_payload = jwt.decode(auth_token, secret)
username = token_payload['username']
self.username = username
self.authenticated = True
self.auth_failures = 0
self.send_json(action='AUTH OK')
# If we are the first websocket connecting on behalf of
# a given user, subscribe to the feed for that user
if len(WebSocket.sockets[username]) == 0:
WebSocket.subscribe(username)
WebSocket.sockets[username].add(self)
except jwt.DecodeError:
self.send_json(action='AUTH FAILED')
except jwt.ExpiredSignatureError:
self.send_json(action='AUTH FAILED')
def is_authenticated(req):
auth_token = req.headers.get('Authentication-Token')
if auth_token:
try:
auth_token_payload = jwt.decode(auth_token, os.environ["JWT_SECRET"])
user_id = int(auth_token_payload['user_id'])
user = User.query.filter_by(id=user_id, is_active=True).first()
if user:
return user
else:
return False
except jwt.ExpiredSignatureError:
return False
else:
return False
authenticated_endpoint.py 文件源码
项目:opserv-backend
作者: OpServ-Monitoring
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def get_current_user(self):
"""
Overrides the built-in function to get the user id based on a JWT-token.
If there is no token included in the Authorization header None is returned.
In case the token is not valid an corresponding exception is raised.
:return: The user id as a string or None if the uid couldn't be extracted
:raises jwt.InvalidIssuedAtError: Raised if the IAT-claim is less than the last time the user password changed
:raises jwt.ExpiredSignatureError: Raised if the EXT-claim is less than the current UNIX time
:raises jwt.InvalidTokenError: Raised if the token is invalid for reasons other than the above mentioned
"""
auth_header = self.request.headers.get('Authorization')
if auth_header is not None and auth_header.startswith("Bearer "):
encoded_jwt_token = auth_header[7:]
payload = TokenGenerator.decode_token(encoded_jwt_token)
# TODO Should we check if payload["iat"] < last password change?
if False:
raise jwt.InvalidIssuedAtError
return payload["uid"]
return None
authenticated_endpoint.py 文件源码
项目:opserv-backend
作者: OpServ-Monitoring
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def prepare(self):
"""
Ensures that the caller is a validated user
"""
super().prepare()
try:
if self.current_user is None:
self.send_error(401) # TODO Add details
except jwt.InvalidIssuedAtError:
log.error("The IAT-claim of the passed token is less than the last time the user password changed")
self.send_error(401, summary="invalidated token")
except jwt.ExpiredSignatureError:
log.error("The EXT-claim of the passed token is less than the current UNIX time")
self.send_error(401, summary="expired token")
except jwt.InvalidTokenError:
log.error("The passed token could not be validated")
self.send_error(401, summary="invalid token")
def _user_from_refresh_token(self, jwtstr: str, key_pemstr: str, expected_issuer: Optional[str]=None,
expected_audience: Optional[str]=None) -> Optional[MNUser]:
_log.debug("Received refresh token: %s", jwtstr)
try:
token = jwt.decode(jwtstr, key_pemstr, algorithms=["RS256"], leeway=10,
issuer=expected_issuer, audience=expected_audience)
except (jwt.ExpiredSignatureError, jwt.InvalidAlgorithmError,
jwt.InvalidIssuerError, jwt.InvalidTokenError) as e:
_log.warning("Rejected refresh token because of %s", str(e))
return None
if "sub" not in token:
_log.error("BUG? Valid refresh token without user in subject. %s", jwtstr)
return None
try:
user = MNUser.objects.get(pk=token["sub"]) # type: MNUser
except MNUser.DoesNotExist:
_log.warning("No such user from valid JWT. %s", jwtstr)
return None
return user
def verify_token():
"""
Verify if the token is valid, not expired and not blacklisted
"""
if 'Authorization' in request.headers:
if request.headers['Authorization'] in cache.blacklisted_tokens:
abort(403, 'Error: invalid token')
try:
payload = jwt.decode(request.headers['Authorization'], config.SECRET_KEY)
g.current_user = payload['id_user']
except jwt.ExpiredSignatureError:
abort(403, 'Error: token expired')
except jwt.DecodeError:
abort(403, 'Error: invalid token')
def decode_auth_token(auth_token):
"""Decodes the auth token - :param auth_token: - :return: integer|string"""
try:
payload = jwt.decode(
auth_token, current_app.config.get('SECRET_KEY'))
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
def verify_jwt(signed_token):
unvalidated_token = jwt.decode(signed_token, verify=False)
hipchat_room = HipChatRoom.query.filter(HipChatRoom.hipchat_oauth_id == unvalidated_token['iss']).first_or_404()
try:
jwt.decode(signed_token, hipchat_room.hipchat_oauth_secret)
except jwt.exceptions.DecodeError:
flask.flash('Unable to decode the Java Web Token provided', 'error')
flask.abort(401)
except jwt.ExpiredSignatureError:
flask.flash('The provided Java Web Token is expired: try refreshing the page', 'error')
flask.abort(401)
else:
return hipchat_room
def verify_token(cls, user, token):
try:
data = jwt.decode(token, str(user.id) + SECRET_CODE)
except (jwt.ExpiredSignatureError, jwt.DecodeError, AttributeError):
return False
else:
if data["username"] == user.username:
return True
else:
return False
def decode_token(token):
"""
Get the organisation ID from a token
:param token: a JSON Web Token
:returns: str, organisation ID
:raises:
jwt.DecodeError: Invalid token or not signed with our key
jwt.ExpiredSignatureError: Token has expired
jwt.InvalidAudienceError: Invalid "aud" claim
jwt.InvalidIssuerError: Invalid "iss" claim
jwt.MissingRequiredClaimError: Missing a required claim
"""
cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT
with open(cert_file) as f:
cert = load_pem_x509_certificate(f.read(), default_backend())
public_key = cert.public_key()
payload = jwt.decode(token,
public_key,
audience=audience(),
issuer=issuer(),
algorithms=[ALGORITHM],
verify=True)
if not payload.get('sub'):
raise jwt.MissingRequiredClaimError('"sub" claim is required')
payload['scope'] = Scope(payload['scope'])
return payload
def test_decode_expired_token():
in_the_past = NOW - timedelta(minutes=EXPIRY * 2 + 1)
with patch.object(_token.datetime, 'utcnow', return_value=in_the_past):
token, expiry = generate_token(CLIENT, SCOPE, 'grant_type')
with pytest.raises(jwt.ExpiredSignatureError):
decode_token(token)
def verify_jwt(auth_header, secret):
"""Extract the jwt token from the header, verify its signature,
its expiration time, and return the payload."""
if not auth_header or auth_header == 'null':
logging.warning("No Authorization header")
return [None, "Unauthorized access: missing authentication"]
method,token = auth_header.split() # separate 'JWT' from the jwt itself
token = bytes(token, 'utf-8')
try:
payload = jwt.decode(token, secret, algorithms=['HS256'], verify=True)
except (jwt.ExpiredSignatureError, jwt.DecodeError) as err:
return [None, str(err)]
return [payload, '']
def test_expired_token(app):
with app.test_request_context():
delta = timedelta(minutes=-5)
access_token = create_access_token('username', expires_delta=delta)
refresh_token = create_refresh_token('username', expires_delta=delta)
with pytest.raises(ExpiredSignatureError):
decode_token(access_token)
with pytest.raises(ExpiredSignatureError):
decode_token(refresh_token)
def verify_token(self, token):
try:
decode = jwt.decode(token, self.token_config.get('secret'), algorithm=self.token_config.get('algorithm'))
except jwt.ExpiredSignatureError:
raise AuthException('Time is up', 401, AuthException.TOKEN_TIME_UP)
except:
raise AuthException('Token is not valid', 401, AuthException.TOKEN_ERROR)
return decode
def check_token(key, access_token):
import jwt
"""
Simple request to check if session has expired (TBD)
:return: Token status
"""
try:
contents = jwt.decode(access_token, key, True, algorithms='RS256', audience='adapter')
# options={'verify_aud': False})
print('contents', contents)
return True
except jwt.ExpiredSignatureError:
print('Signature has expired')
return False
def check_token():
data = request.get_json()
auth_token = data['token']
try:
auth_token_payload = jwt.decode(auth_token, os.environ["JWT_SECRET"])
expiration_time = auth_token_payload['exp']
is_valid = datetime.now() + timedelta(minutes=30) <= datetime.fromtimestamp(expiration_time)
except jwt.ExpiredSignatureError:
is_valid = False
return make_response(jsonify({'isValid': is_valid})), 200
def is_token_valid(token):
"""
Check if token is valid.
"""
try:
jwt.decode(token, 'secret', algorithms=JWTHelper.JWT_ALGORITHM)
return True, "Valid"
except jwt.ExpiredSignatureError:
return False, "Token Expired"
except jwt.InvalidTokenError:
return False, "Token is Invalid"
def decode_token(self, token):
try:
decoded = jwt.decode(token, self.config['JWT_SECRET_KEY'],
algorithms=[self.config['JWT_ALGORITHM']], leeway=self.config.get('JWT_LEEWAY', 0))
except jwt.ExpiredSignatureError:
raise werkzeug.exceptions.Unauthorized("JWT Error: Token is expired.")
except jwt.DecodeError:
raise werkzeug.exceptions.Unauthorized("JWT Error: Token could not be decoded.")
except jwt.InvalidTokenError:
raise werkzeug.exceptions.Unauthorized("JWT Error: Token is invalid.")
return decoded
def decode_token(token):
"""Decode the access token from the Authorization header."""
try:
payload = jwt.decode(token, current_app.config.get('SECRET'))
return payload['sub']
except jwt.ExpiredSignatureError:
return "Expired token. Please log in to get a new token"
except jwt.InvalidTokenError:
return "Invalid token. Please register or login"
def _set_error_handler_callbacks(self, app):
"""
Sets the error handler callbacks used by this extension
"""
@app.errorhandler(NoAuthorizationError)
def handle_auth_error(e):
return self._unauthorized_callback(str(e))
@app.errorhandler(CSRFError)
def handle_auth_error(e):
return self._unauthorized_callback(str(e))
@app.errorhandler(ExpiredSignatureError)
def handle_expired_error(e):
return self._expired_token_callback()
@app.errorhandler(InvalidHeaderError)
def handle_invalid_header_error(e):
return self._invalid_token_callback(str(e))
@app.errorhandler(InvalidTokenError)
def handle_invalid_token_error(e):
return self._invalid_token_callback(str(e))
@app.errorhandler(JWTDecodeError)
def handle_jwt_decode_error(e):
return self._invalid_token_callback(str(e))
@app.errorhandler(WrongTokenError)
def handle_wrong_token_error(e):
return self._invalid_token_callback(str(e))
@app.errorhandler(RevokedTokenError)
def handle_revoked_token_error(e):
return self._revoked_token_callback()
@app.errorhandler(FreshTokenRequired)
def handle_fresh_token_required(e):
return self._needs_fresh_token_callback()
@app.errorhandler(UserLoadError)
def handler_user_load_error(e):
# The identity is already saved before this exception was raised,
# otherwise a different exception would be raised, which is why we
# can safely call get_jwt_identity() here
identity = get_jwt_identity()
return self._user_loader_error_callback(identity)
@app.errorhandler(UserClaimsVerificationError)
def handle_failed_user_claims_verification(e):
return self._claims_verification_failed_callback()
def check_token_status(self):
"""
Simple request to check if session has expired (TBD)
:return: Token status
"""
if self.access_token is None:
try:
token_file = self.platform['credentials']['token_file']
token_path = os.path.join(
self.workspace.workspace_root,
self.workspace.platforms_dir,
token_file)
# Construct the POST login request
with open(token_path, "r") as _file:
self.access_token = _file.read
except:
return True
print('Public_key=', self.platform_public_key)
if self.platform_public_key is None:
return True
# Some old PyJWT versions crash with public key binary string, instead add
# self.platform_public_key.decode('utf-8')
try:
print('access_token=', self.access_token)
print('platform_public_key=', self.platform_public_key.decode('utf-8'))
decoded = jwt.decode(self.access_token, self.platform_public_key.decode('utf-8'),
True, algorithms='RS256', audience='adapter')
# options={'verify_aud': False})
print('contents', decoded)
try:
self.username = decoded['preferred_username']
return True
except:
return True
except jwt.DecodeError:
print('Token cannot be decoded because it failed validation')
return False
except jwt.ExpiredSignatureError:
print('Signature has expired')
return False
except jwt.InvalidIssuerError:
return False
except jwt.InvalidIssuedAtError:
return False
def check_token_status():
"""
Simple request to check if session has expired (TBD)
:return: Token status
"""
import jwt
access_token = ''
platform_public_key = b''
print('Public_key=', platform_public_key)
if platform_public_key is None:
return True
# Some old PyJWT versions crash with public key binary string, instead add
# self.platform_public_key.decode('utf-8')
try:
print('access_token=', access_token)
print('platform_public_key=', platform_public_key.decode('utf-8'))
decoded = jwt.decode(access_token, platform_public_key.decode('utf-8'),
True, algorithms='RS256', audience='adapter')
# options={'verify_aud': False})
print('contents', decoded)
try:
username = decoded['preferred_username']
return True
except:
return True
except jwt.DecodeError:
print('Token cannot be decoded because it failed validation')
return False
except jwt.ExpiredSignatureError:
print('Signature has expired')
return False
except jwt.InvalidIssuerError:
return False
except jwt.InvalidIssuedAtError:
return False
# generate_keypair()
# token = user_login('user04', '1234')
# print(token)
# key = get_platform_public_key()
# check_token(key, token)
# sign()
# result = check_token_status()
# print("RESULT=", result)