def logout(request):
""" Logout a user
"""
try:
token = request.environ['HTTP_X_API_TOKEN']
except (KeyError, IndexError, TypeError):
raise BadRequest('Missing HTTP X-Api-Token header')
try:
data = jwt.decode(token, settings.SECRET_KEY)
data = json.loads(CRYPTO.decrypt(str(data['data'])))
user = User.objects.get(id=data['id'])
user.last_login = datetime.fromtimestamp(0)
user.save()
return {'message': 'Logged out'}
except (utils.CryptoException, KeyError, jwt.DecodeError,
jwt.ExpiredSignature, User.DoesNotExist):
raise BadRequest('Invalid token')
python类DecodeError()的实例源码
def authenticate():
# logger.debug("endpoint request: %s" % request.endpoint)
if re.search('tenant_provisioned', str(request.endpoint)):
g.user = "phone_home"
logger.info("Authentication bypassed: tenant_provisioned")
return
try:
decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256'])
g.user = decoded['user']
except KeyError:
logger.error("Error: key error.")
abort(401)
except jwt.DecodeError:
logger.error("Error: decode error")
abort(401)
def authjwt_method(token):
""" an authentication method using rest_framework_jwt
"""
import jwt
from rest_framework_jwt.authentication import (jwt_decode_handler,
jwt_get_username_from_payload)
try:
payload = jwt_decode_handler(token)
except (jwt.ExpiredSignature, jwt.DecodeError, jwt.InvalidTokenError):
return None
User = get_user_model()
username = jwt_get_username_from_payload(payload)
if not username: # pragma: no cover
return None
try:
user = User.objects.get_by_natural_key(username)
except User.DoesNotExist: # pragma: no cover
return None
return user
jwt_authenticator.py 文件源码
项目:django-open-volunteering-platform
作者: OpenVolunteeringPlatform
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload, request.channel)
return (user, jwt_value)
def decode_jwt(token, options=None):
""" Authenticate via JSON Web Token
Options: `secret`, `algorithms` """
options = options or {}
secret = options.get('secret', CONF and CONF.secret)
algorithms = options.get('algorithms', CONF and CONF.algorithms)
try:
data = jwt.decode(token, secret, algorithms)
except jwt.DecodeError:
raise Exception('Cannot decode token')
else:
logging.debug('* Decoded data = {}'.format(data))
return data
def authenticate(token):
"""
Tries to authenticate user based on the supplied token. It also checks
the token structure and validity.
Based on jwt_auth.JSONWebTokenAuthMixin.authenticate
"""
try:
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
msg = 'Signature has expired.'
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = 'Error decoding signature.'
raise exceptions.AuthenticationFailed(msg)
user = authenticate_credentials(payload)
return user
def token_required(secret_key):
def token_required_decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
g = f.func_globals
if not request.headers.get('Authorization'):
return Response(response="Missing authorization header", status=401)
try:
payload = parse_token(request.headers.get('Authorization').split()[1], secret_key)
except jwt.DecodeError:
return Response(response="Token is invalid", status=401)
except jwt.ExpiredSignature:
return Response(response="Token has expired", status=401)
# Set username for decorated func
g["username"] = payload['sub']
return f(*args, **kwargs)
return decorated_function
return token_required_decorator
def authenticate(self, auth_token):
try:
token_payload = jwt.decode(auth_token, secret)
username = token_payload['username']
self.username = username
self.authenticated = True
self.auth_failures = 0
self.send_json(action='AUTH OK')
# If we are the first websocket connecting on behalf of
# a given user, subscribe to the feed for that user
if len(WebSocket.sockets[username]) == 0:
WebSocket.subscribe(username)
WebSocket.sockets[username].add(self)
except jwt.DecodeError:
self.send_json(action='AUTH FAILED')
except jwt.ExpiredSignatureError:
self.send_json(action='AUTH FAILED')
def authenticate(self, request):
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
if api_settings.JWT_PERMANENT_TOKEN_AUTH:
payload = jwt_devices_decode_handler(jwt_value)
else:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _("Signature has expired.")
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _("Error decoding signature.")
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
return user, jwt_value
def get_certificate(self, kid):
# retrieve keys from jwks_url
resp = self.request(self.jwks_url(), method='GET')
resp.raise_for_status()
# find the proper key for the kid
for key in resp.json()['keys']:
if key['kid'] == kid:
x5c = key['x5c'][0]
break
else:
raise DecodeError('Cannot find kid={}'.format(kid))
certificate = '-----BEGIN CERTIFICATE-----\n' \
'{}\n' \
'-----END CERTIFICATE-----'.format(x5c)
return load_pem_x509_certificate(certificate.encode(),
default_backend())
def user_data(self, access_token, *args, **kwargs):
response = kwargs.get('response')
id_token = response.get('id_token')
# decode the JWT header as JSON dict
jwt_header = json.loads(
base64.b64decode(id_token.split('.', 1)[0]).decode()
)
# get key id and algorithm
key_id = jwt_header['kid']
algorithm = jwt_header['alg']
try:
# retrieve certificate for key_id
certificate = self.get_certificate(key_id)
return jwt_decode(
id_token,
key=certificate.public_key(),
algorithms=algorithm,
audience=self.setting('SOCIAL_AUTH_AZUREAD_OAUTH2_KEY')
)
except (DecodeError, ExpiredSignature) as error:
raise AuthTokenError(self, error)
def decode_token(token, secret='', ttl=DEFAULT_TTL, verify=True):
try:
token = jwt.decode(str(token), secret, verify=verify)
except jwt.DecodeError as e:
raise TokenInvalid("error decoding JSON Web Token", e)
if verify:
issue_time = token.get('issuedAt')
if issue_time is None:
raise TokenInvalid("'issuedAt' is missing from token")
issue_time = isodate.parse_datetime(issue_time)
expiry_time = issue_time + datetime.timedelta(seconds=ttl)
if issue_time > _now():
raise TokenInvalid("token is not yet valid")
if expiry_time < _now():
raise TokenInvalid("token has expired")
return token
def verify_token():
"""
Verify if the token is valid, not expired and not blacklisted
"""
if 'Authorization' in request.headers:
if request.headers['Authorization'] in cache.blacklisted_tokens:
abort(403, 'Error: invalid token')
try:
payload = jwt.decode(request.headers['Authorization'], config.SECRET_KEY)
g.current_user = payload['id_user']
except jwt.ExpiredSignatureError:
abort(403, 'Error: token expired')
except jwt.DecodeError:
abort(403, 'Error: invalid token')
def verify_token(cls, user, token):
try:
data = jwt.decode(token, str(user.id) + SECRET_CODE)
except (jwt.ExpiredSignatureError, jwt.DecodeError, AttributeError):
return False
else:
if data["username"] == user.username:
return True
else:
return False
def decode_token(self, token):
try:
return jwt.decode(token, key=self.secret, algorithms=self.algorithm)
except jwt.DecodeError:
return False
def azure_ad_authorized():
response = azure_ad.authorized_response()
print response
if response is None:
flask.flash('You denied the request to sign in.')
return flask.redirect(util.get_next_url)
id_token = response['id_token']
flask.session['oauth_token'] = (id_token, '')
try:
decoded_id_token = jwt.decode(id_token, verify=False)
except (jwt.DecodeError, jwt.ExpiredSignature):
flask.flash('You denied the request to sign in.')
return flask.redirect(util.get_next_url)
user_db = retrieve_user_from_azure_ad(decoded_id_token)
return auth.signin_user_db(user_db)
def decode_token(token):
"""
Get the organisation ID from a token
:param token: a JSON Web Token
:returns: str, organisation ID
:raises:
jwt.DecodeError: Invalid token or not signed with our key
jwt.ExpiredSignatureError: Token has expired
jwt.InvalidAudienceError: Invalid "aud" claim
jwt.InvalidIssuerError: Invalid "iss" claim
jwt.MissingRequiredClaimError: Missing a required claim
"""
cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT
with open(cert_file) as f:
cert = load_pem_x509_certificate(f.read(), default_backend())
public_key = cert.public_key()
payload = jwt.decode(token,
public_key,
audience=audience(),
issuer=issuer(),
algorithms=[ALGORITHM],
verify=True)
if not payload.get('sub'):
raise jwt.MissingRequiredClaimError('"sub" claim is required')
payload['scope'] = Scope(payload['scope'])
return payload
def test_decode_token_different_public_key():
token, expiry = generate_token(CLIENT, SCOPE, 'grant_type')
with patch.object(_token, 'LOCALHOST_CRT', koi.CLIENT_CRT):
with pytest.raises(jwt.DecodeError):
decode_token(token)
def test_invalid_assertion(self):
self.request.body_arguments['assertion'] = ['invalid_token']
grant = grants.AuthorizeDelegate(self.request)
with pytest.raises(jwt.DecodeError):
yield grant.generate_token()
def authenticate(self, request):
auth = get_authorization_header(request).split()
auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
raise exceptions.AuthenticationFailed()
if len(auth) == 1:
msg = _("Invalid Authorization header. No credentials provided.")
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _("Invalid Authorization header. Credentials string should not contain spaces.")
raise exceptions.AuthenticationFailed(msg)
try:
payload = jwt_decode_handler(auth[1])
except jwt.ExpiredSignature:
msg = _("Signature has expired.")
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _("Error decoding signature.")
raise exceptions.AuthenticationFailed(msg)
user = self.authenticate_credentials(payload)
return (user, auth[1])
def authenticate(self, request):
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None, None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _("Signature has expired.")
raise AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _("Error decoding signature.")
raise AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise AuthenticationFailed()
# Check blacklist
self.check_blacklist(payload)
user = self.authenticate_credentials(payload)
# Check if password already change invalidated all old token
self.check_changed_password_invalidated_old_token(user, payload)
return user, jwt_value
def _check_payload(token):
# Check payload valid
try:
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
msg = _("Signature has expired.")
raise forms.ValidationError(msg)
except jwt.DecodeError:
msg = _("Error decoding signature.")
raise forms.ValidationError(msg)
return payload
def verify_jwt(auth_header, secret):
"""Extract the jwt token from the header, verify its signature,
its expiration time, and return the payload."""
if not auth_header or auth_header == 'null':
logging.warning("No Authorization header")
return [None, "Unauthorized access: missing authentication"]
method,token = auth_header.split() # separate 'JWT' from the jwt itself
token = bytes(token, 'utf-8')
try:
payload = jwt.decode(token, secret, algorithms=['HS256'], verify=True)
except (jwt.ExpiredSignatureError, jwt.DecodeError) as err:
return [None, str(err)]
return [payload, '']
def get_jwt_value(self, request):
auth = get_authorization_header(request).split()
auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
return None
if len(auth) == 1:
msg = _('Invalid Authorization header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid Authorization header. Credentials string '
'should contain no spaces.')
raise exceptions.AuthenticationFailed(msg)
jwt_value = auth[1]
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
return payload
def test_public_key(self):
"""
Test when the public key is provided to deserialize
"""
token = scitokens.SciToken(key = self._private_key)
serialized_token = token.serialize(issuer = "local")
new_token = scitokens.SciToken.deserialize(serialized_token, public_key = self._public_pem, insecure = True)
self.assertIsInstance(new_token, scitokens.SciToken)
# With invalid key
with self.assertRaises(ValueError):
scitokens.SciToken.deserialize(serialized_token, insecure=True, public_key = "asdf".encode())
# With a proper key, but not the right one
private_key = generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = private_key.public_key()
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
with self.assertRaises(DecodeError):
scitokens.SciToken.deserialize(serialized_token, insecure=True, public_key = pem)
def on_join(data):
if current_app.config["AUTH"] == Config.NONE:
user = User("Gandalf", superadmin=True)
else:
token = data.get('jwt')
if not token:
disconnect()
return
try:
payload = LoginService.parse_api_token_direct(token)
except DecodeError:
disconnect()
return
except ExpiredSignature:
disconnect()
return
user = User.query.filter_by(username=payload["username"]).scalar()
printers = user.get_accessible_printers()
for printer in printers:
join_room(str(printer.id))
datatype = {
'id': fields.Integer,
'name': fields.String,
'group': fields.List(
fields.Nested({
'name': fields.String
})
)
}
emit("printers", marshal(printers, datatype))
def login_required(f):
"""
Decorator function for routes
Checks Authorization header, token validity and injects user into flask global variable g
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if current_app.config["AUTH"] == Config.NONE:
g.user = User("Gandalf", superadmin=True)
return f(*args, **kwargs)
if not request.headers.get('Authorization'):
return "Missing authorization header", 401
try:
payload = LoginService.parse_api_token(request)
except DecodeError:
return 'Token is invalid', 401
except ExpiredSignature:
return 'Token has expired', 401
g.user = User.query.filter_by(username=payload['username']).first()
return f(*args, **kwargs)
return decorated_function
def superadmin_required(f):
"""
Decorator function for routes
Checks Authorization header, token validity, superadmin permission and injects user into flask global variable g
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if current_app.config["AUTH"] == Config.NONE:
g.user = User("Gandalf", superadmin=True)
return f(*args, **kwargs)
if not request.headers.get('Authorization'):
return "Missing authorization header", 401
try:
payload = LoginService.parse_api_token(request)
except DecodeError:
return 'Token is invalid', 401
except ExpiredSignature:
return 'Token has expired', 401
g.user = User.query.filter_by(username=payload['username']).first()
if g.user.superadmin is False:
return 'You are not superadmin', 401
return f(*args, **kwargs)
return decorated_function
def dispatch(self, request, **kwargs):
hdr = request.headers.get('Authorization', '')
if not hdr.startswith('Bearer '):
return self.handle_no_token(request, **kwargs)
bearer = hdr.split(' ', 1)[1].strip()
try:
request.jwt = jwt.decode(bearer, **self.get_jwt_decode_kwargs(request, **kwargs))
except jwt.DecodeError as err:
log.info('Invalid token: %s', err)
return self.handle_invalid_token(request, err, **kwargs)
self.handle_valid_token(request, **kwargs)
return super().dispatch(request, **kwargs)
def check(self, json):
"""Checking a JWT against passphrase and expiry"""
try:
payload = jwt.decode(json, self.secret, algorithms=['HS256'])
return payload['pgp'], True
# something has gone wrong
except jwt.DecodeError: # test
return "Invalid Token", False
except jwt.ExpiredSignature: # test
return "Expired Token", False