def logout(request):
""" Logout a user
"""
try:
token = request.environ['HTTP_X_API_TOKEN']
except (KeyError, IndexError, TypeError):
raise BadRequest('Missing HTTP X-Api-Token header')
try:
data = jwt.decode(token, settings.SECRET_KEY)
data = json.loads(CRYPTO.decrypt(str(data['data'])))
user = User.objects.get(id=data['id'])
user.last_login = datetime.fromtimestamp(0)
user.save()
return {'message': 'Logged out'}
except (utils.CryptoException, KeyError, jwt.DecodeError,
jwt.ExpiredSignature, User.DoesNotExist):
raise BadRequest('Invalid token')
python类ExpiredSignature()的实例源码
def authjwt_method(token):
""" an authentication method using rest_framework_jwt
"""
import jwt
from rest_framework_jwt.authentication import (jwt_decode_handler,
jwt_get_username_from_payload)
try:
payload = jwt_decode_handler(token)
except (jwt.ExpiredSignature, jwt.DecodeError, jwt.InvalidTokenError):
return None
User = get_user_model()
username = jwt_get_username_from_payload(payload)
if not username: # pragma: no cover
return None
try:
user = User.objects.get_by_natural_key(username)
except User.DoesNotExist: # pragma: no cover
return None
return user
jwt_authenticator.py 文件源码
项目:django-open-volunteering-platform
作者: OpenVolunteeringPlatform
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload, request.channel)
return (user, jwt_value)
def authenticate(token):
"""
Tries to authenticate user based on the supplied token. It also checks
the token structure and validity.
Based on jwt_auth.JSONWebTokenAuthMixin.authenticate
"""
try:
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
msg = 'Signature has expired.'
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = 'Error decoding signature.'
raise exceptions.AuthenticationFailed(msg)
user = authenticate_credentials(payload)
return user
def token_required(secret_key):
def token_required_decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
g = f.func_globals
if not request.headers.get('Authorization'):
return Response(response="Missing authorization header", status=401)
try:
payload = parse_token(request.headers.get('Authorization').split()[1], secret_key)
except jwt.DecodeError:
return Response(response="Token is invalid", status=401)
except jwt.ExpiredSignature:
return Response(response="Token has expired", status=401)
# Set username for decorated func
g["username"] = payload['sub']
return f(*args, **kwargs)
return decorated_function
return token_required_decorator
def authenticate(self, request):
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
if api_settings.JWT_PERMANENT_TOKEN_AUTH:
payload = jwt_devices_decode_handler(jwt_value)
else:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _("Signature has expired.")
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _("Error decoding signature.")
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
return user, jwt_value
def user_data(self, access_token, *args, **kwargs):
response = kwargs.get('response')
id_token = response.get('id_token')
# decode the JWT header as JSON dict
jwt_header = json.loads(
base64.b64decode(id_token.split('.', 1)[0]).decode()
)
# get key id and algorithm
key_id = jwt_header['kid']
algorithm = jwt_header['alg']
try:
# retrieve certificate for key_id
certificate = self.get_certificate(key_id)
return jwt_decode(
id_token,
key=certificate.public_key(),
algorithms=algorithm,
audience=self.setting('SOCIAL_AUTH_AZUREAD_OAUTH2_KEY')
)
except (DecodeError, ExpiredSignature) as error:
raise AuthTokenError(self, error)
def azure_ad_authorized():
response = azure_ad.authorized_response()
print response
if response is None:
flask.flash('You denied the request to sign in.')
return flask.redirect(util.get_next_url)
id_token = response['id_token']
flask.session['oauth_token'] = (id_token, '')
try:
decoded_id_token = jwt.decode(id_token, verify=False)
except (jwt.DecodeError, jwt.ExpiredSignature):
flask.flash('You denied the request to sign in.')
return flask.redirect(util.get_next_url)
user_db = retrieve_user_from_azure_ad(decoded_id_token)
return auth.signin_user_db(user_db)
def authenticate(self, request):
auth = get_authorization_header(request).split()
auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
raise exceptions.AuthenticationFailed()
if len(auth) == 1:
msg = _("Invalid Authorization header. No credentials provided.")
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _("Invalid Authorization header. Credentials string should not contain spaces.")
raise exceptions.AuthenticationFailed(msg)
try:
payload = jwt_decode_handler(auth[1])
except jwt.ExpiredSignature:
msg = _("Signature has expired.")
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _("Error decoding signature.")
raise exceptions.AuthenticationFailed(msg)
user = self.authenticate_credentials(payload)
return (user, auth[1])
def authenticate(self, request):
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None, None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _("Signature has expired.")
raise AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _("Error decoding signature.")
raise AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise AuthenticationFailed()
# Check blacklist
self.check_blacklist(payload)
user = self.authenticate_credentials(payload)
# Check if password already change invalidated all old token
self.check_changed_password_invalidated_old_token(user, payload)
return user, jwt_value
def _check_payload(token):
# Check payload valid
try:
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
msg = _("Signature has expired.")
raise forms.ValidationError(msg)
except jwt.DecodeError:
msg = _("Error decoding signature.")
raise forms.ValidationError(msg)
return payload
def get_jwt_value(self, request):
auth = get_authorization_header(request).split()
auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
return None
if len(auth) == 1:
msg = _('Invalid Authorization header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid Authorization header. Credentials string '
'should contain no spaces.')
raise exceptions.AuthenticationFailed(msg)
jwt_value = auth[1]
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
return payload
def on_join(data):
if current_app.config["AUTH"] == Config.NONE:
user = User("Gandalf", superadmin=True)
else:
token = data.get('jwt')
if not token:
disconnect()
return
try:
payload = LoginService.parse_api_token_direct(token)
except DecodeError:
disconnect()
return
except ExpiredSignature:
disconnect()
return
user = User.query.filter_by(username=payload["username"]).scalar()
printers = user.get_accessible_printers()
for printer in printers:
join_room(str(printer.id))
datatype = {
'id': fields.Integer,
'name': fields.String,
'group': fields.List(
fields.Nested({
'name': fields.String
})
)
}
emit("printers", marshal(printers, datatype))
def login_required(f):
"""
Decorator function for routes
Checks Authorization header, token validity and injects user into flask global variable g
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if current_app.config["AUTH"] == Config.NONE:
g.user = User("Gandalf", superadmin=True)
return f(*args, **kwargs)
if not request.headers.get('Authorization'):
return "Missing authorization header", 401
try:
payload = LoginService.parse_api_token(request)
except DecodeError:
return 'Token is invalid', 401
except ExpiredSignature:
return 'Token has expired', 401
g.user = User.query.filter_by(username=payload['username']).first()
return f(*args, **kwargs)
return decorated_function
def superadmin_required(f):
"""
Decorator function for routes
Checks Authorization header, token validity, superadmin permission and injects user into flask global variable g
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if current_app.config["AUTH"] == Config.NONE:
g.user = User("Gandalf", superadmin=True)
return f(*args, **kwargs)
if not request.headers.get('Authorization'):
return "Missing authorization header", 401
try:
payload = LoginService.parse_api_token(request)
except DecodeError:
return 'Token is invalid', 401
except ExpiredSignature:
return 'Token has expired', 401
g.user = User.query.filter_by(username=payload['username']).first()
if g.user.superadmin is False:
return 'You are not superadmin', 401
return f(*args, **kwargs)
return decorated_function
def check(self, json):
"""Checking a JWT against passphrase and expiry"""
try:
payload = jwt.decode(json, self.secret, algorithms=['HS256'])
return payload['pgp'], True
# something has gone wrong
except jwt.DecodeError: # test
return "Invalid Token", False
except jwt.ExpiredSignature: # test
return "Expired Token", False
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not request.headers.get('Authorization'):
response = jsonify(message='Missing authorization header')
response.status_code = 401
return response
try:
payload = parse_token(request)
except DecodeError:
response = jsonify(message='Token is invalid')
response.status_code = 401
return response
except ExpiredSignature:
response = jsonify(message='Token has expired')
response.status_code = 401
return response
g.user_id = payload['sub']
return f(*args, **kwargs)
return decorated_function
# Helper functions, get currently logged in user
def decode(self, token):
try:
return jwt.decode(token,
self.secret,
algorithm=self.algorithm,
issuer=self.issuer)
except jwt.ExpiredSignature:
raise InvalidUsage("Token is expired")
except jwt.DecodeError:
raise InvalidUsage('Token signature is invalid')
except Exception:
raise Exception('Unable to parse authentication token.')
def check_auth(self):
auth = request.headers.get('Authorization', None)
message = ''
if not auth:
abort(401, message = 'Authorization header is expected')
parts = auth.split()
if parts[0].lower() != 'bearer':
message = 'Authorization header must start with Bearer'
elif len(parts) == 1:
message = 'Token not found'
elif len(parts) > 2:
message = 'Authorization header must be Bearer + \s + token'
if message:
abort(401, message = message)
token = parts[1]
try:
payload = jwt.decode(
token,
Security.get_jwt_skey(),
algorithms = ['HS256']
)
except jwt.ExpiredSignature:
message = 'token is expired'
except jwt.InvalidAudienceError:
message = 'incorrect audience'
except jwt.DecodeError:
message = 'token signature is invalid'
if message:
abort(401, message = message)
self.logger.debug('Access granted for %s!' % payload['user']['login'])
return payload
def user_data(self, access_token, *args, **kwargs):
"""Return user data by querying Microsoft service"""
try:
return self.get_json(
'https://graph.microsoft.com/v1.0/me',
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'Authorization': 'Bearer ' + access_token
},
method='GET'
)
except (DecodeError, ExpiredSignature) as error:
raise AuthTokenError(self, error)
def user_data(self, access_token, *args, **kwargs):
response = kwargs.get('response')
id_token = response.get('id_token')
try:
decoded_id_token = jwt_decode(id_token, verify=False)
except (DecodeError, ExpiredSignature) as de:
raise AuthTokenError(self, de)
return decoded_id_token