def get_auth_token(self, last_totp=None):
"""Think of :class:`URLSafeTimedSerializer` `salt` parameter as
namespace instead of salt. `The salt explained:
<https://pythonhosted.org/itsdangerous/#the-salt>`_.
"""
data = [self.email, self._password, str(self.id)]
if last_totp:
data.append(last_totp)
s = URLSafeTimedSerializer(
current_app.config['SECRET_KEY'],
salt='user-auth',
signer_kwargs=dict(
key_derivation='hmac',
digest_method=hashlib.sha256)
)
return s.dumps(data)
python类URLSafeTimedSerializer()的实例源码
def get(self, request, *args, **kwargs):
if request.GET.get('key'):
serializer = URLSafeTimedSerializer(settings.SECRET_KEY)
try:
user_id = serializer.loads(
request.GET.get('key'),
max_age=60 * 2, # Signature expires after 2 minutes
)
user = get_object_or_404(User, id=user_id)
user.backend = 'django.contrib.auth.backends.ModelBackend'
login(request, user)
return redirect('home')
except (BadSignature, BadTimeSignature):
return redirect('login')
return super().get(request, *args, **kwargs)
def form_valid(self, form):
email = form.cleaned_data['email']
user = User.objects.get(username=email)
safe = URLSafeTimedSerializer(settings.SECRET_KEY)
url = '{site}{path}?key={key}'.format(
site=settings.SITE_URL,
path=reverse('login'),
key=safe.dumps(user.id),
)
send_mail(
_('Link to login into the Knowledge Base'),
url,
settings.DEFAULT_FROM_EMAIL,
[email],
fail_silently=False,
html_message=render_to_string(
'login_email.html', {'url': url}
),
)
return redirect('home')
def post(self):
token = self.get_argument("token")
try:
from tornado_chat import SECRET_KEY
serializer = URLSafeTimedSerializer(SECRET_KEY)
parasite = serializer.loads(token, max_age=86400) # do i really have to do 24hrs in secs?
parasiteId = self.db.get("SELECT id, reset_token FROM parasite WHERE id = %s", parasite)
if parasiteId is not None and self.get_argument("password") == self.get_argument(
"password2") and parasiteId.reset_token == token:
hashed_password = yield executor.submit(
bcrypt.hashpw, tornado.escape.utf8(self.get_argument("password")),
bcrypt.gensalt())
self.db.execute("UPDATE parasite SET password = %s, reset_token='' WHERE id = %s", hashed_password,
parasite)
self.redirect("login?error=Password reset. Please login.")
else:
self.redirect("login?error=Password reset failed.")
except:
self.redirect("login?error=Password reset failed.")
def is_valid_token(token, token_expiry):
"""
Validates if the supplied token is valid, and hasn't expired.
:param token: Token to check
:param token_expiry: When the token expires in seconds
:return: True if token is valid, and user_id contained in token
"""
entropy = current_app.secret_key if current_app.secret_key else 'un1testingmode'
serializer = URLSafeTimedSerializer(entropy)
try:
tokenised_user_id = serializer.loads(token, max_age=token_expiry)
except SignatureExpired:
current_app.logger.debug('Token has expired')
return False, None
except BadSignature:
current_app.logger.debug('Bad Token Signature')
return False, None
return True, tokenised_user_id
def confirm_token(self, token, expiration=3600):
from server import app
serializer = URLSafeTimedSerializer(app.config['SECRET_KEY'])
session = SessionManager.Session()
try:
email = serializer.loads(
token,
salt=app.config['SECRET_PASSWORD_SALT'],
max_age=expiration
)
if (email == self.email) and (not self.email_confirmed):
self.email_confirmed = True
user = session.query(User).filter(User.id == self.id).one()
user.email_confirmed = True
session.commit()
rpc_request.send('email_changed', {'email': self.email, 'user_id': user.id})
return json_resp({'message': 'ok'})
else:
raise ClientError('Invalid Token')
except:
raise ClientError('Invalid Token')
finally:
SessionManager.Session.remove()
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def init_app(self, app):
key = app.config['ITSDANGEROUSKEY']
self.signer = URLSafeTimedSerializer(key)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def generate_token(email):
serializer = URLSafeTimedSerializer(app.config['SECRET_KEY'])
return serializer.dumps(email, salt=app.config['SECURITY_PASSWORD_SALT'])
def confirm_token(token, expiration=3600):
serializer = URLSafeTimedSerializer(app.config['SECRET_KEY'])
try:
email = serializer.loads(
token,
salt=app.config['SECURITY_PASSWORD_SALT'],
max_age=expiration
)
except:
return False
return email
def generate_csrf(secret_key=None, token_key=None):
"""Generate a CSRF token. The token is cached for a request, so multiple
calls to this function will generate the same token.
During testing, it might be useful to access the signed token in
``g.csrf_token`` and the raw token in ``session['csrf_token']``.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
if field_name not in g:
if field_name not in session:
session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
setattr(g, field_name, s.dumps(session[field_name]))
return g.get(field_name)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def generate_csrf(secret_key=None, token_key=None):
"""Generate a CSRF token. The token is cached for a request, so multiple
calls to this function will generate the same token.
During testing, it might be useful to access the signed token in
``g.csrf_token`` and the raw token in ``session['csrf_token']``.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
if field_name not in g:
if field_name not in session:
session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
setattr(g, field_name, s.dumps(session[field_name]))
return g.get(field_name)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def generate_validate_token(self, username):
serializer = utsr(self.security_key)
return serializer.dumps(username, self.salt)
def confirm_validate_token(self, token, expiration=600):
serializer = utsr(self.security_key)
return serializer.loads(token, salt=self.salt, max_age=expiration)
def remove_validate_token(self, token):
serializer = utsr(self.security_key)
return serializer.loads(token, salt=self.salt)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def get_serializer(self, app):
if not app.secret_key:
return None
return URLSafeTimedSerializer(app.secret_key, salt=self.salt)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def get_reset_token(self) -> str:
"""Get a token which a user can use to reset his password.
.. note:: Don't forget to commit the database.
:returns: A token that can be used in :py:meth:`User.reset_password` to
reset the password of a user.
"""
ts = URLSafeTimedSerializer(psef.app.config['SECRET_KEY'])
self.reset_token = str(uuid.uuid4())
return str(ts.dumps(self.username, salt=self.reset_token))