def authenticate(self, request):
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
if api_settings.JWT_PERMANENT_TOKEN_AUTH:
payload = jwt_devices_decode_handler(jwt_value)
else:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _("Signature has expired.")
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _("Error decoding signature.")
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
return user, jwt_value
python类AuthenticationFailed()的实例源码
def authenticate(self, request):
"""
Returns a `Person` if a correct access token has been supplied. Otherwise returns `None`.
"""
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'bearer':
return None
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
token = AccessToken.get_token(auth[1].decode())
except (InvalidTokenException, UnicodeDecodeError):
msg = _('Token invalide.')
raise exceptions.AuthenticationFailed(msg)
token.person.role.token = token
return token.person.role, token
def authenticate_credentials(self, userid, password):
if password:
return
try:
key = ApiKey.objects.get_from_cache(key=userid)
except ApiKey.DoesNotExist:
raise AuthenticationFailed('API key is not valid')
if not key.is_active:
raise AuthenticationFailed('Key is disabled')
raven.tags_context({
'api_key': userid,
})
return (AnonymousUser(), key)
def authenticate_credentials(self, userid, password):
try:
pk = ProjectKey.objects.get_from_cache(public_key=userid)
except ProjectKey.DoesNotExist:
return None
if not constant_time_compare(pk.secret_key, password):
return None
if not pk.is_active:
raise AuthenticationFailed('Key is disabled')
if not pk.roles.api:
raise AuthenticationFailed('Key does not allow API access')
return (AnonymousUser(), pk)
def authenticate(self, request):
if not self.get_user_info_url():
logger.warning('The setting OAUTH2_USER_INFO_URL is invalid!')
return None
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'bearer':
return None
if len(auth) == 1:
raise exceptions.AuthenticationFailed('Invalid token header. No credentials provided.')
elif len(auth) > 2:
raise exceptions.AuthenticationFailed('Invalid token header. Token string should not contain spaces.')
return self.authenticate_credentials(auth[1].decode('utf8'))
def authenticate_credentials(self, payload):
"""Get or create an active user with the username contained in the payload."""
username = payload.get('preferred_username') or payload.get('username')
if username is None:
raise exceptions.AuthenticationFailed('JWT must include a preferred_username or username claim!')
else:
try:
user, __ = get_user_model().objects.get_or_create(username=username)
attributes_updated = False
for claim, attr in self.get_jwt_claim_attribute_map().items():
payload_value = payload.get(claim)
if getattr(user, attr) != payload_value and payload_value is not None:
setattr(user, attr, payload_value)
attributes_updated = True
if attributes_updated:
user.save()
except:
msg = 'User retrieval failed.'
logger.exception(msg)
raise exceptions.AuthenticationFailed(msg)
return user
def authenticate_credentials(self, key):
token_cache = 'token_' + key
cache_user = cache.get(token_cache)
if cache_user:
return (cache_user, key)
try:
token = self.model.objects.get(key=key)
except self.model.DoesNotExist:
raise exceptions.AuthenticationFailed('User does not exist.')
if not token.user.is_active:
raise exceptions.PermissionDenied('The user is forbidden.')
utc_now = timezone.now()
if token.created < utc_now - timezone.timedelta(hours=24 * 30):
raise exceptions.AuthenticationFailed('Token has been expired.')
if token:
token_cache = 'token_' + key
cache.set(token_cache, token.user, 24 * 7 * 60 * 60)
return (token.user, token)
def my_exception_handler(exc, context):
# Call REST framework's default exception handler first,
# to get the standard error response.
response = exception_handler(exc, context)
# Now add the HTTP status code to the response.
# print(exc)
# print(context)
if response is not None:
if isinstance(exc, exceptions.AuthenticationFailed):
response.data['error_code'] = 2
elif isinstance(exc, exceptions.PermissionDenied):
response.data['error_code'] = 3
else:
response.data['error_code'] = 1
return response
def validate(self, data):
user_obj = None
username = data.get("username", None)
password = data.get("password", None)
if not username:
raise exceptions.AuthenticationFailed('A username or email is required to login.')
user = User.objects.filter(username=username)
print(user)
if user.exists():
user_obj = user.first()
else:
raise exceptions.AuthenticationFailed("Incorrect username")
if user_obj:
if not user_obj.check_password(password):
raise exceptions.AuthenticationFailed('Incorrect password. Please try again.')
# data['token'] = Token.objects.create(user=user_obj)
return data
def authenticate(self, request):
auth = authentication.get_authorization_header(request).split()
if not auth or auth[0].lower() != b'token':
return None
if len(auth) == 1:
msg = _('Invalid token header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid token header. Token string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
token = auth[1].decode()
except UnicodeError:
msg = _('Invalid token header. Token string should not contain invalid characters.')
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(token)
def authenticate(self, request):
auth = get_authorization_header(request).split()
authenticate_header = self.authenticate_header(request=request)
if not auth or smart_text(auth[0].lower()) != authenticate_header.lower():
return None
if len(auth) == 1:
msg = _('Invalid token header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid token header. Token string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
token = auth[1].decode()
except UnicodeError:
msg = _('Invalid token header. Token string should not contain invalid characters.')
raise exceptions.AuthenticationFailed(msg)
try:
payload = decode_jwt_token(token=token)
except jwt.exceptions.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.exceptions.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.exceptions.InvalidKeyError:
msg = _('Unauthorized token signing key.')
raise exceptions.AuthenticationFailed(msg)
except jwt.exceptions.InvalidTokenError:
raise exceptions.AuthenticationFailed()
return self.authenticate_credentials(payload=payload)
def authenticate_credentials(self, key) -> Tuple[User, Token]:
try:
session = CashdeskSession.objects.get(api_token=key)
except CashdeskSession.DoesNotExist:
raise exceptions.AuthenticationFailed('Invalid token.')
if not session.is_active():
raise exceptions.AuthenticationFailed('Your session has ended.')
if session.cashdesk != detect_cashdesk(self.request):
raise exceptions.AuthenticationFailed(
_('Your token is valid for a different cashdesk. Your IP is: {}').format(get_ip_address(self.request)))
return session.user, session.api_token
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.validated_data['user']
if not user.is_active:
raise exceptions.AuthenticationFailed(
_('User inactive or deleted.'))
payload = jwt_payload_handler(user)
if api_settings.JWT_ALLOW_REFRESH:
payload['orig_iat'] = timegm(datetime.utcnow().utctimetuple())
token = jwt_encode_handler(payload)
response_data = jwt_response_payload_handler(token, user, request)
return Response(response_data,
status=status.HTTP_200_OK)
serializers.py 文件源码
项目:django-rest-framework-jwt-refresh-token
作者: lock8
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def validate(self, attrs):
refresh_token = attrs['refresh_token']
try:
token = RefreshToken.objects.select_related('user').get(
key=refresh_token)
except RefreshToken.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))
attrs['user'] = token.user
return attrs
def login(self, username, password):
"""Authenticate a user.
:param str username: Username of the user
:param str password: Password of the user
:raises: exceptions.AuthenticationFailed
"""
data = {
'data': {
'attributes': {
'username': username,
'password': password
},
'type': 'obtain-json-web-tokens',
}
}
response = self.post(reverse('login'), data)
if response.status_code != status.HTTP_200_OK:
raise exceptions.AuthenticationFailed()
self.credentials(
HTTP_AUTHORIZATION='{0} {1}'.format(
api_settings.JWT_AUTH_HEADER_PREFIX,
response.data['token']
)
)
def test_client_login_fails(db):
client = JSONAPIClient()
with pytest.raises(exceptions.AuthenticationFailed):
client.login('someuser', 'invalidpw')
def authenticate(self, request):
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'basic':
return None
if len(auth) == 1:
msg = 'Invalid basic auth token header. No credentials provided.'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid basic auth token header. Basic authentication string should not contain spaces.'
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(auth[1])
def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using URL parameters. Otherwise returns `None`.
"""
if not 'username' in request.query_params:
msg = 'No username URL parameter provided.'
raise exceptions.AuthenticationFailed(msg)
if not 'password' in request.query_params:
msg = 'No password URL parameter provided.'
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(request.query_params['username'], request.query_params['password'])
def authenticate_credentials(self, userid, key):
try:
token = self.model.objects.get(key=key)
except self.model.DoesNotExist:
raise exceptions.AuthenticationFailed('Invalid token')
if not token.user.is_active:
raise exceptions.AuthenticationFailed('User inactive or deleted')
return token.user, token
def authenticate_credentials(self, key):
model = self.get_model()
try:
token = model.objects.select_related('user').get(key=key)
except model.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))
if not token.user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (token.user, token)