def verify_auth_token(token):
"""Validate the token whether is night."""
serializer = Serializer(
current_app.config['SECRET_KEY'])
try:
# serializer object already has tokens in itself and wait for
# compare with token from HTTP Request /api/posts Method `POST`.
data = serializer.loads(token)
except SignatureExpired:
return None
except BadSignature:
return None
user = User.query.filter_by(id=data['id']).first()
return user
python类SignatureExpired()的实例源码
def password_reset(token):
try:
user_id = validate_password_reset_token(token)
except BadTimeSignature:
flash('Invalid token', 'danger')
return redirect('/login')
except SignatureExpired:
flash('Expired token', 'danger')
return redirect('/login')
if request.method == 'POST':
password = request.form.get('password', '')
confirm = request.form.get('password_confirmation', '')
if valid_new_password(password, confirm):
user = User(get_or_404(User.get_collection(), _id=user_id))
change_password(user, password)
flash('Password was successfully changed.', 'success')
return redirect('/login')
return render_template('password_reset.html')
def change_email(self, token):
"""Verify the new email for this user."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
db.session.add(self)
db.session.commit()
return True
def verify_token(username, token):
"""
Verify validity of token
"""
s = TimedJWSSerializer(app.config['SECRET_KEY'])
try:
ut.pretty_print("Trying to load the token")
data = s.loads(token)
except SignatureExpired:
ut.pretty_print("ERROR: Expired Token")
return False
except BadSignature:
ut.pretty_print("ERROR: Invalid Token")
return False
else:
ut.pretty_print("Token successfully loaded")
stored = db.sessions.find_one(filter={'username': data['username']}, sort=[('_id',-1)])
if not stored:
return False
result = json_util.loads(json_util.dumps(stored))
return pwd_context.verify(data['password'], result['password_hash']) and data['username'] == username
def is_valid_token(token, token_expiry):
"""
Validates if the supplied token is valid, and hasn't expired.
:param token: Token to check
:param token_expiry: When the token expires in seconds
:return: True if token is valid, and user_id contained in token
"""
entropy = current_app.secret_key if current_app.secret_key else 'un1testingmode'
serializer = URLSafeTimedSerializer(entropy)
try:
tokenised_user_id = serializer.loads(token, max_age=token_expiry)
except SignatureExpired:
current_app.logger.debug('Token has expired')
return False, None
except BadSignature:
current_app.logger.debug('Bad Token Signature')
return False, None
return True, tokenised_user_id
def change_email(self, token):
"""Verify the new email for this user."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
db.session.add(self)
db.session.commit()
return True
def try_refresh_token(self, session_id):
morsel = context.cookies.get(self.refresh_token_key)
if not morsel or morsel.value is None or not morsel.value.strip():
self.bad()
return
refresh_token_encoded = morsel.value
# Decoding the refresh token
try:
refresh_principal = JwtRefreshToken.load(refresh_token_encoded)
self.ok(
self.create_principal(member_id=refresh_principal.id, session_id=session_id),
setup_header=True
)
except itsdangerous.SignatureExpired:
self.bad()
except itsdangerous.BadData:
self.bad()
raise HttpBadRequest()
def authenticate_request(self):
if self.token_key not in context.environ:
self.bad()
return
encoded_token = context.environ[self.token_key]
if encoded_token is None or not encoded_token.strip():
self.bad()
return
try:
self.ok(self.verify_token(encoded_token))
except itsdangerous.SignatureExpired as ex:
# The token has expired. So we're trying to restore it using refresh-token.
session_id = ex.payload.get('sessionId')
if session_id:
self.try_refresh_token(session_id)
else:
self.bad()
raise HttpUnauthorized()
except itsdangerous.BadData:
# The token is Malformed
self.bad()
raise HttpBadRequest()
def unsign_url_safe(token, secret_key, salt=None, **kw):
"""
To sign url safe data.
If expires_in is provided it will Time the signature
:param token:
:param secret_key:
:param salt: (string) a namespace key
:param kw:
:return:
"""
if len(token.split(".")) == 3:
s = URLSafeTimedSerializer2(secret_key=secret_key, salt=salt, **kw)
value, timestamp = s.loads(token, max_age=None, return_timestamp=True)
now = datetime.datetime.utcnow()
if timestamp > now:
return value
else:
raise itsdangerous.SignatureExpired(
'Signature age %s < %s ' % (timestamp, now),
payload=value,
date_signed=timestamp)
else:
s = itsdangerous.URLSafeSerializer(secret_key=secret_key, salt=salt, **kw)
return s.loads(token)
def verify_auth_token(token):
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(b64decode(token))
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
def verify_auth_token(cls, token):
s = TimedJSONWebSignatureSerializer(current_app.config.get("SECRET_KEY", "No secret key"))
try:
data = s.loads(token)
except SignatureExpired:
raise TokenExpired(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token?????????"}))
except BadSignature:
raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"}))
try:
user = User.get_object(id=data["user_id"])
except ObjectNotExists:
raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"}))
return user
def _verify_token(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
data = None
expired, invalid = False, False
try:
data = s.loads(token)
except SignatureExpired:
expired = True
except Exception:
invalid = True
return expired, invalid, data
def lab(token):
try:
lab_id, instance_id, exam_id, response_id = external_serializer.loads(token, max_age=60)
except (BadSignature, SignatureExpired):
abort(403)
url_base = '{0}/labapiConnection/ShowLab?labInstanceGuid={1}&fullScreen=False'
okay_states = {'STARTING', 'ACTIVE'}
if instance_id:
lab_instance_url = '{0}/labapi/v1/instance?id={1}'.format(current_app.config['XTREME_URL'],
instance_id)
resp = requests.get(lab_instance_url, auth=HTTPBasicAuth(username=current_app.config['XTREME_ID'],
password=current_app.config['XTREME_SECRET']))
if resp.status_code != 200 or resp.json()['state'] not in okay_states:
abort(400)
url = resp.json()['connectionUrl'] or url_base.format(current_app.config['XTREME_URL'], instance_id)
else:
payload = {
'labID': lab_id
}
lab_url = '{0}/labapi/v1/Create'.format(current_app.config['XTREME_URL'])
resp = requests.put(lab_url, json=payload, auth=HTTPBasicAuth(username=current_app.config['XTREME_ID'],
password=current_app.config['XTREME_SECRET']))
if resp.status_code != 200:
abort(400)
instance_id = resp.json()['id']
redis_store.setex(response_id, 3600, instance_id)
url = url_base.format(current_app.config['XTREME_URL'], instance_id)
return render_template('xtreme.html', url=url, response_id=response_id, instance_id=instance_id, exam_id=exam_id)
def check_token(self, token_sign):
"""
?? token, ?????? token
"""
from itsdangerous import TimestampSigner, SignatureExpired, BadTimeSignature
s = TimestampSigner(self._sign_key)
try:
token = s.unsign(token_sign, max_age=60) # 60???
return {'success': token}
except SignatureExpired as e:
# ??????
return {'error': e.message}
except BadTimeSignature as e:
# ??????
return {'error': e.message}
def verfy_auth_token(token):
serializer=Serializer(
current_app.config['SECRET_KEY']
)
try:
data=serializer.loads(token)
except SignatureExpired:
return None
except BadSignature:
return None
user=User.query.filter_by(id=data['id']).first()
return user
def _get_auth_status(self, authuser, authpassword):
try:
val = self.serializer.loads(authpassword, max_age=self.LOGIN_EXPIRATION)
except itsdangerous.SignatureExpired:
return dict(status="expired")
except itsdangerous.BadData:
# check if we got user/password direct authentication
return self._validate(authuser, authpassword)
else:
if not isinstance(val, list) or len(val) != 2 or val[0] != authuser:
threadlog.debug("mismatch credential for user %r", authuser)
return dict(status="nouser")
return dict(status="ok", groups=val[1])
def get_by_token(cls, token):
s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
# may raise BadSignature or SignatureExpired
data = s.loads(token)
user = current_app.user_datastore.find_user(id=data['id'])
if not user:
raise HTTPError(401, 'Unknow user with id {}'.format(data['id']))
if user.token == token:
if datetime.datetime.utcnow() < user.token_expires:
return user
raise SignatureExpired('bad token')
raise BadSignature('bad token')
def confirm_account(self, token):
"""Verify that the provided token is for this user's id."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('confirm') != self.id:
return False
self.confirmed = True
db.session.add(self)
db.session.commit()
return True
def reset_password(self, token, new_password):
"""Verify the new password for this user."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('reset') != self.id:
return False
self.password = new_password
db.session.add(self)
db.session.commit()
return True
def verify_auth_token(token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
def confirm_account(self, token):
"""Verify that the provided token is for this user's id."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('confirm') != self.id:
return False
self.confirmed = True
db.session.add(self)
db.session.commit()
return True
def reset_password(self, token, new_password):
"""Verify the new password for this user."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('reset') != self.id:
return False
self.password = new_password
db.session.add(self)
db.session.commit()
return True
def get_token_status(token, serializer, max_age=None, return_data=False):
"""Get the status of a token.
:param token: The token to check
:param serializer: The name of the seriailzer. Can be one of the
following: ``confirm``, ``login``, ``reset``
:param max_age: The name of the max age config option. Can be on of
the following: ``CONFIRM_EMAIL``, ``LOGIN``, ``RESET_PASSWORD``
"""
serializer = getattr(_security, serializer + '_serializer')
max_age = get_max_age(max_age)
user, data = None, None
expired, invalid = False, False
try:
data = serializer.loads(token, max_age=max_age)
except SignatureExpired:
d, data = serializer.loads_unsafe(token)
expired = True
except (BadSignature, TypeError, ValueError):
invalid = True
if data:
user = _datastore.find_user(id=data[0])
expired = expired and (user is not None)
if return_data:
return expired, invalid, user, data
else:
return expired, invalid, user
def verify_token(self, encoded_token):
principal = super().verify_token(encoded_token)
if not self.validate_session(principal.session_id):
raise itsdangerous.SignatureExpired('The token has already invalidated', principal.payload)
return principal
def verify_token(self, encoded_token):
principal = super().verify_token(encoded_token)
if token_expired:
raise itsdangerous.SignatureExpired('Simulating', payload=principal.payload)
return principal
def get_payload_from_token(token):
s = Serializer(current_app.config['SECRET_KEY'])
cipher = AESCipher(current_app.config['SECRET_KEY'][:16])
try:
data = json.loads(cipher.decrypt(s.loads(token)))
return data
except SignatureExpired, se:
time_offset = (datetime.now()- se.date_signed).total_seconds()
current_app.logger.error('token expired: %s. signature date %s. offset with current date = %s'%(se.message,str(se.date_signed),str(time_offset)))
current_app.logger.error('current date %s, token date %s'%(str(datetime.now()), str(se.date_signed)))
if -1<= time_offset < 0:#allow for 1 seconds out of sync machines
current_app.logger.info('token time offset within grace period. allowing auth')
return json.loads(cipher.decrypt(se.payload))
else:
LogApiTokenExpired()
# raise SignatureExpired(se)
raise TokenExpired()
# abort(419, message = 'Authentication expired.')
except BadSignature, e:
current_app.logger.error('bad signature in token')
encoded_payload = e.payload
if encoded_payload is not None:
try:
decoded_payload = s.load_payload(encoded_payload)
payload= json.loads(cipher.decrypt(decoded_payload))
LogApiTokenInvalid(payload)
except BadData:
LogApiTokenInvalid(dict(error='bad data in token',
token=token))
abort(401, message = 'bad signature in token')
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
"""Check if the given data is a valid CSRF token. This compares the given
signed token to the one stored in the session.
:param data: The signed CSRF token to be checked.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param time_limit: Number of seconds that the token is valid. Default is
``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
:raises ValidationError: Contains the reason that validation failed.
.. versionchanged:: 0.14
Raises ``ValidationError`` with a specific error message rather than
returning ``True`` or ``False``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
time_limit = _get_config(
time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
)
if not data:
raise ValidationError('The CSRF token is missing.')
if field_name not in session:
raise ValidationError('The CSRF session token is missing.')
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
try:
token = s.loads(data, max_age=time_limit)
except SignatureExpired:
raise ValidationError('The CSRF token has expired.')
except BadData:
raise ValidationError('The CSRF token is invalid.')
if not safe_str_cmp(session[field_name], token):
raise ValidationError('The CSRF tokens do not match.')
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
"""Check if the given data is a valid CSRF token. This compares the given
signed token to the one stored in the session.
:param data: The signed CSRF token to be checked.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param time_limit: Number of seconds that the token is valid. Default is
``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
:raises ValidationError: Contains the reason that validation failed.
.. versionchanged:: 0.14
Raises ``ValidationError`` with a specific error message rather than
returning ``True`` or ``False``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
time_limit = _get_config(
time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
)
if not data:
raise ValidationError('The CSRF token is missing.')
if field_name not in session:
raise ValidationError('The CSRF session token is missing.')
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
try:
token = s.loads(data, max_age=time_limit)
except SignatureExpired:
raise ValidationError('The CSRF token has expired.')
except BadData:
raise ValidationError('The CSRF token is invalid.')
if not safe_str_cmp(session[field_name], token):
raise ValidationError('The CSRF tokens do not match.')
def load_token(token):
"""
Flask-Login token_loader callback.
The token_loader function asks this function to take the token that was
stored on the users computer process it to check if its valid and then
return a User Object if its valid or None if its not valid.
:param token: Token generated by :meth:`app.models.User.get_auth_token`
"""
# The Token itself was generated by User.get_auth_token. So it is up to
# us to known the format of the token data itself.
# The Token was encrypted using itsdangerous.URLSafeTimedSerializer which
# allows us to have a max_age on the token itself. When the cookie is
# stored
# on the users computer it also has a exipry date, but could be changed by
# the user, so this feature allows us to enforce the exipry date of the
# token
# server side and not rely on the users cookie to exipre.
max_age = current_app.config['REMEMBER_COOKIE_DURATION'].total_seconds()
# Decrypt the Security Token, data = [username, hashpass, id]
s = URLSafeTimedSerializer(
current_app.config['SECRET_KEY'],
salt='user-auth',
signer_kwargs=dict(key_derivation='hmac',
digest_method=hashlib.sha256))
try:
data = s.loads(token, max_age=max_age)
except (BadTimeSignature, SignatureExpired):
return None
# Find the User
user = User.query.get(data[2])
# 2FA check
totp_endpoint = request.endpoint == 'auth.verify_totp'
if user and user.otp_enabled and not totp_endpoint and len(data) < 4:
return None
# Check Password and return user or None
if user and data[1] == user._password:
return user
return None
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
"""Check if the given data is a valid CSRF token. This compares the given
signed token to the one stored in the session.
:param data: The signed CSRF token to be checked.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param time_limit: Number of seconds that the token is valid. Default is
``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
:raises ValidationError: Contains the reason that validation failed.
.. versionchanged:: 0.14
Raises ``ValidationError`` with a specific error message rather than
returning ``True`` or ``False``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
time_limit = _get_config(
time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
)
if not data:
raise ValidationError('The CSRF token is missing.')
if field_name not in session:
raise ValidationError('The CSRF session token is missing.')
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
try:
token = s.loads(data, max_age=time_limit)
except SignatureExpired:
raise ValidationError('The CSRF token has expired.')
except BadData:
raise ValidationError('The CSRF token is invalid.')
if not safe_str_cmp(session[field_name], token):
raise ValidationError('The CSRF tokens do not match.')