def _decode(self, data):
"""
Safely decodes an encoded text stream back into a list of messages.
If the encoded text stream contained an invalid hash or was in an
invalid format, ``None`` is returned.
"""
if not data:
return None
bits = data.split('$', 1)
if len(bits) == 2:
hash, value = bits
if constant_time_compare(hash, self._hash(value)):
try:
# If we get here (and the JSON decode works), everything is
# good. In any other case, drop back and return None.
return json.loads(value, cls=MessageDecoder)
except ValueError:
pass
# Mark the data as used (so it gets removed) since something was wrong
# with the data.
self.used = True
return None
python类constant_time_compare()的实例源码
def check_token(self, user, token):
"""
Check that a password reset token is correct for a given user.
"""
# Parse the token
try:
ts_b36, hash = token.split("-")
except ValueError:
return False
try:
ts = base36_to_int(ts_b36)
except ValueError:
return False
# Check that the timestamp/uid has not been tampered with
if not constant_time_compare(self._make_token_with_timestamp(user, ts), token):
return False
# Check the timestamp is within limit
if (self._num_days(self._today()) - ts) > settings.PASSWORD_RESET_TIMEOUT_DAYS:
return False
return True
def _decode(self, data):
"""
Safely decodes an encoded text stream back into a list of messages.
If the encoded text stream contained an invalid hash or was in an
invalid format, ``None`` is returned.
"""
if not data:
return None
bits = data.split('$', 1)
if len(bits) == 2:
hash, value = bits
if constant_time_compare(hash, self._hash(value)):
try:
# If we get here (and the JSON decode works), everything is
# good. In any other case, drop back and return None.
return json.loads(value, cls=MessageDecoder)
except ValueError:
pass
# Mark the data as used (so it gets removed) since something was wrong
# with the data.
self.used = True
return None
def check_token(self, user, token):
"""
Check that a password reset token is correct for a given user.
"""
# Parse the token
try:
ts_b36, hash = token.split("-")
except ValueError:
return False
try:
ts = base36_to_int(ts_b36)
except ValueError:
return False
# Check that the timestamp/uid has not been tampered with
if not constant_time_compare(self._make_token_with_timestamp(user, ts), token):
return False
# Check the timestamp is within limit
if (self._num_days(self._today()) - ts) > settings.PASSWORD_RESET_TIMEOUT_DAYS:
return False
return True
def _decode(self, data):
"""
Safely decodes an encoded text stream back into a list of messages.
If the encoded text stream contained an invalid hash or was in an
invalid format, ``None`` is returned.
"""
if not data:
return None
bits = data.split('$', 1)
if len(bits) == 2:
hash, value = bits
if constant_time_compare(hash, self._hash(value)):
try:
# If we get here (and the JSON decode works), everything is
# good. In any other case, drop back and return None.
return json.loads(value, cls=MessageDecoder)
except ValueError:
pass
# Mark the data as used (so it gets removed) since something was wrong
# with the data.
self.used = True
return None
def check_token(self, user, token):
"""
Check that a password reset token is correct for a given user.
"""
# Parse the token
try:
ts_b36, hash = token.split("-")
except ValueError:
return False
try:
ts = base36_to_int(ts_b36)
except ValueError:
return False
# Check that the timestamp/uid has not been tampered with
if not constant_time_compare(self._make_token_with_timestamp(user, ts), token):
return False
# Check the timestamp is within limit
if (self._num_days(self._today()) - ts) > settings.PASSWORD_RESET_TIMEOUT_DAYS:
return False
return True
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' %
e.__class__.__name__)
logger.warning(force_text(e))
return {}
def check_token(self, user, token):
"""
Check that a password reset token is correct for a given user.
"""
# Parse the token
try:
ts_b36, hash = token.split("-")
except ValueError:
return False
try:
ts = base36_to_int(ts_b36)
except ValueError:
return False
# Check that the timestamp/uid has not been tampered with
if not constant_time_compare(self._make_token_with_timestamp(user, ts), token):
return False
# Check the timestamp is within limit
if (self._num_days(self._today()) - ts) > settings.PASSWORD_RESET_TIMEOUT_DAYS:
return False
return True
def _decode(self, data):
"""
Safely decodes an encoded text stream back into a list of messages.
If the encoded text stream contained an invalid hash or was in an
invalid format, ``None`` is returned.
"""
if not data:
return None
bits = data.split('$', 1)
if len(bits) == 2:
hash, value = bits
if constant_time_compare(hash, self._hash(value)):
try:
# If we get here (and the JSON decode works), everything is
# good. In any other case, drop back and return None.
return json.loads(value, cls=MessageDecoder)
except ValueError:
pass
# Mark the data as used (so it gets removed) since something was wrong
# with the data.
self.used = True
return None
def _decode(self, data):
"""
Safely decodes an encoded text stream back into a list of messages.
If the encoded text stream contained an invalid hash or was in an
invalid format, ``None`` is returned.
"""
if not data:
return None
bits = data.split('$', 1)
if len(bits) == 2:
hash, value = bits
if constant_time_compare(hash, self._hash(value)):
try:
# If we get here (and the JSON decode works), everything is
# good. In any other case, drop back and return None.
return json.loads(value, cls=MessageDecoder)
except ValueError:
pass
# Mark the data as used (so it gets removed) since something was wrong
# with the data.
self.used = True
return None
def check_token(self, user, token):
"""
Check that a password reset token is correct for a given user.
"""
# Parse the token
try:
ts_b36, hash = token.split("-")
except ValueError:
return False
try:
ts = base36_to_int(ts_b36)
except ValueError:
return False
# Check that the timestamp/uid has not been tampered with
if not constant_time_compare(self._make_token_with_timestamp(user, ts), token):
return False
# Check the timestamp is within limit
if (self._num_days(self._today()) - ts) > settings.PASSWORD_RESET_TIMEOUT_DAYS:
return False
return True
def check_token(self, user, token):
"""
Check that a password reset token is correct for a given user.
"""
# Parse the token
try:
ts_b36, hash = token.split("-")
except ValueError:
return False
try:
ts = base36_to_int(ts_b36)
except ValueError:
return False
# Check that the timestamp/uid has not been tampered with
if not constant_time_compare(self._make_token_with_timestamp(user, ts), token):
return False
# Check the timestamp is within limit
if (self._num_days(self._today()) - ts) > settings.PASSWORD_RESET_TIMEOUT_DAYS:
return False
return True
def _decode(self, data):
"""
Safely decodes an encoded text stream back into a list of messages.
If the encoded text stream contained an invalid hash or was in an
invalid format, ``None`` is returned.
"""
if not data:
return None
bits = data.split('$', 1)
if len(bits) == 2:
hash, value = bits
if constant_time_compare(hash, self._hash(value)):
try:
# If we get here (and the JSON decode works), everything is
# good. In any other case, drop back and return None.
return json.loads(value, cls=MessageDecoder)
except ValueError:
pass
# Mark the data as used (so it gets removed) since something was wrong
# with the data.
self.used = True
return None
def project_from_auth(self, auth):
if not auth.public_key:
raise APIUnauthorized('Invalid api key')
try:
pk = ProjectKey.objects.get_from_cache(public_key=auth.public_key)
except ProjectKey.DoesNotExist:
raise APIUnauthorized('Invalid api key')
# a secret key may not be present which will be validated elsewhere
if not constant_time_compare(pk.secret_key, auth.secret_key or pk.secret_key):
raise APIUnauthorized('Invalid api key')
if not pk.is_active:
raise APIUnauthorized('API key is disabled')
if not pk.roles.store:
raise APIUnauthorized('Key does not allow event storage access')
return Project.objects.get_from_cache(id=pk.project_id)
def authenticate_credentials(self, userid, password):
try:
pk = ProjectKey.objects.get_from_cache(public_key=userid)
except ProjectKey.DoesNotExist:
return None
if not constant_time_compare(pk.secret_key, password):
return None
if not pk.is_active:
raise AuthenticationFailed('Key is disabled')
if not pk.roles.api:
raise AuthenticationFailed('Key does not allow API access')
return (AnonymousUser(), pk)
def check_token(self, user, token):
"""
Check that a password reset token is correct for a given user.
"""
# Parse the token
try:
ts_b36, hash = token.split("-")
except ValueError:
return False
try:
ts = base36_to_int(ts_b36)
except ValueError:
return False
# Check that the timestamp/uid has not been tampered with
if not constant_time_compare(self._make_token_with_timestamp(user, ts), token):
return False
# Check the timestamp is within limit
if (self._num_days(self._today()) - ts) > settings.PASSWORD_RESET_TIMEOUT_DAYS:
return False
return True
def _decode(self, data):
"""
Safely decodes an encoded text stream back into a list of messages.
If the encoded text stream contained an invalid hash or was in an
invalid format, ``None`` is returned.
"""
if not data:
return None
bits = data.split('$', 1)
if len(bits) == 2:
hash, value = bits
if constant_time_compare(hash, self._hash(value)):
try:
# If we get here (and the JSON decode works), everything is
# good. In any other case, drop back and return None.
return json.loads(value, cls=MessageDecoder)
except ValueError:
pass
# Mark the data as used (so it gets removed) since something was wrong
# with the data.
self.used = True
return None
def _decode(self, data):
"""
Safely decodes an encoded text stream back into a list of messages.
If the encoded text stream contained an invalid hash or was in an
invalid format, ``None`` is returned.
"""
if not data:
return None
bits = data.split('$', 1)
if len(bits) == 2:
hash, value = bits
if constant_time_compare(hash, self._hash(value)):
try:
# If we get here (and the JSON decode works), everything is
# good. In any other case, drop back and return None.
return json.loads(value, cls=MessageDecoder)
except ValueError:
pass
# Mark the data as used (so it gets removed) since something was wrong
# with the data.
self.used = True
return None
def check_token(self, user, token):
"""
Check that a password reset token is correct for a given user.
"""
# Parse the token
try:
ts_b36, hash = token.split("-")
except ValueError:
return False
try:
ts = base36_to_int(ts_b36)
except ValueError:
return False
# Check that the timestamp/uid has not been tampered with
if not constant_time_compare(self._make_token_with_timestamp(user, ts), token):
return False
# Check the timestamp is within limit
if (self._num_days(self._today()) - ts) > settings.PASSWORD_RESET_TIMEOUT_DAYS:
return False
return True
def _decode(self, data):
"""
Safely decodes an encoded text stream back into a list of messages.
If the encoded text stream contained an invalid hash or was in an
invalid format, ``None`` is returned.
"""
if not data:
return None
bits = data.split('$', 1)
if len(bits) == 2:
hash, value = bits
if constant_time_compare(hash, self._hash(value)):
try:
# If we get here (and the JSON decode works), everything is
# good. In any other case, drop back and return None.
return json.loads(value, cls=MessageDecoder)
except ValueError:
pass
# Mark the data as used (so it gets removed) since something was wrong
# with the data.
self.used = True
return None