def verify_auth_token(token):
"""Validate the token whether is night."""
serializer = Serializer(
current_app.config['SECRET_KEY'])
try:
# serializer object already has tokens in itself and wait for
# compare with token from HTTP Request /api/posts Method `POST`.
data = serializer.loads(token)
except SignatureExpired:
return None
except BadSignature:
return None
user = User.query.filter_by(id=data['id']).first()
return user
python类BadSignature()的实例源码
def activate_user(payload):
s = get_serializer()
try:
user_id = s.loads(payload)
except BadSignature:
abort(400)
user = User.query.filter(User.id == user_id).first()
if user is not None:
user.activate()
else:
abort(400)
return "user activated"
# from . import users, listings, books
def get(self, request, *args, **kwargs):
if request.GET.get('key'):
serializer = URLSafeTimedSerializer(settings.SECRET_KEY)
try:
user_id = serializer.loads(
request.GET.get('key'),
max_age=60 * 2, # Signature expires after 2 minutes
)
user = get_object_or_404(User, id=user_id)
user.backend = 'django.contrib.auth.backends.ModelBackend'
login(request, user)
return redirect('home')
except (BadSignature, BadTimeSignature):
return redirect('login')
return super().get(request, *args, **kwargs)
def change_email(self, token):
"""Verify the new email for this user."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
db.session.add(self)
db.session.commit()
return True
def ensure_user(view_func):
"""Decorator that errors if the user is not logged in.
This is analagous to frontend.ensure_user
"""
@wraps(view_func)
def inner(*args, **kwargs):
header_name = 'X-Session-Key'
err_msg = 'A valid {0} header is required.'.format(header_name)
key = request.headers.get(header_name, '')
try:
signer = get_signer()
token = signer.unsign(key).decode('utf-8')
except (BadSignature, ValueError):
abort(403, err_msg)
user = core.user_for_token(token)
if user is None:
abort(403, err_msg)
return view_func(user, *args, **kwargs)
return inner
def open_session(self, app, request):
sid = request.cookies.get(app.session_cookie_name)
if not sid:
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=self.permanent)
if self.use_signer:
signer = self._get_signer(app)
if signer is None:
return None
try:
sid_as_bytes = signer.unsign(sid)
sid = sid_as_bytes.decode()
except BadSignature:
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=self.permanent)
data = self.cache.get(self.key_prefix + sid)
if data is not None:
return self.session_class(data, sid=sid)
return self.session_class(sid=sid, permanent=self.permanent)
def verify_token(username, token):
"""
Verify validity of token
"""
s = TimedJWSSerializer(app.config['SECRET_KEY'])
try:
ut.pretty_print("Trying to load the token")
data = s.loads(token)
except SignatureExpired:
ut.pretty_print("ERROR: Expired Token")
return False
except BadSignature:
ut.pretty_print("ERROR: Invalid Token")
return False
else:
ut.pretty_print("Token successfully loaded")
stored = db.sessions.find_one(filter={'username': data['username']}, sort=[('_id',-1)])
if not stored:
return False
result = json_util.loads(json_util.dumps(stored))
return pwd_context.verify(data['password'], result['password_hash']) and data['username'] == username
def is_valid_token(token, token_expiry):
"""
Validates if the supplied token is valid, and hasn't expired.
:param token: Token to check
:param token_expiry: When the token expires in seconds
:return: True if token is valid, and user_id contained in token
"""
entropy = current_app.secret_key if current_app.secret_key else 'un1testingmode'
serializer = URLSafeTimedSerializer(entropy)
try:
tokenised_user_id = serializer.loads(token, max_age=token_expiry)
except SignatureExpired:
current_app.logger.debug('Token has expired')
return False, None
except BadSignature:
current_app.logger.debug('Bad Token Signature')
return False, None
return True, tokenised_user_id
def change_email(self, token):
"""Verify the new email for this user."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
db.session.add(self)
db.session.commit()
return True
def token_find(self, token: str) -> int:
"""Return a user ID from a token.
Parses the token to get the user ID and then unsigns it
using the user's hashed password as a secret key
"""
userid_encoded = token.split('.')[0]
try:
userid = int(base64.urlsafe_b64decode(userid_encoded))
except (binascii.Error, ValueError):
return None
raw_user = self.get_raw_user(userid)
if raw_user is None:
return
s = TimestampSigner(raw_user['password']['hash'])
try:
s.unsign(token)
except itsdangerous.BadSignature:
return
return userid
def verify_auth_token(token):
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(b64decode(token))
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def verify_auth_token(cls, token):
s = TimedJSONWebSignatureSerializer(current_app.config.get("SECRET_KEY", "No secret key"))
try:
data = s.loads(token)
except SignatureExpired:
raise TokenExpired(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token?????????"}))
except BadSignature:
raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"}))
try:
user = User.get_object(id=data["user_id"])
except ObjectNotExists:
raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"}))
return user
def set_password(token):
"""Set initial customer password. The template for this route contains
bootstrap.css, bootstrap-theme.css and main.css.
This is similar to the password reset option with two exceptions:
it has a longer expiration time and does not require old password.
:param token: Token generated by
:meth:`app.models.User.generate_reset_token`
:return:
"""
s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
try:
s.loads(token)
except BadSignature:
flash('Signature expired.')
return redirect(url_for('main.index'))
form = SetPasswordForm()
if form.validate_on_submit():
User.set_password(token, form.data['password'])
flash('Your new password has been set.')
return redirect(url_for('main.index'))
for field, err in form.errors.items():
flash(err[0], 'danger')
return render_template('auth/set_password.html', form=form, token=token)
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def open_session(self, app, request):
s = self.get_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = app.permanent_session_lifetime.total_seconds()
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def check_signed_params(self, jwt_data):
"""
Decodes the params, makes sure they pass signature (i.e., are valid),
and then checks that we haven't seen the msgid before. Raises a
ValueError if errors, else returns True.
TODO(matt): this particular method seems to be unused (not so the one
in federer_handlers.config.config).
"""
s = itsdangerous.JSONWebSignatureSerializer(self.conf['bts_secret'])
try:
data = s.loads(jwt_data)
except itsdangerous.BadSignature:
logger.error("Bad jwt signature for request, ignoring.")
raise ValueError("Bad signature")
# make sure the msg hasn't been seen before, if so, discard it
if "msgid" in data:
if self.msgid_db.seen(str(data['msgid'])):
logger.error("Endaga: Repeat msgid: %s" % (data['msgid'],))
raise ValueError("Repeat msgid: %s" % (data['msgid'],))
else:
logger.error("Endaga: No message ID.")
raise ValueError("No message ID.")
return data
def check_signed_params(self, jwt_data):
"""Checks a JWT signature and message ID.
Decodes the params, makes sure they pass signature (i.e., are valid),
and then checks that we haven't seen the msgid before.
TODO(matt): refactor as this was copied from federer_handlers.common.
Inheriting from common as before does not work because CI
cannot import ESL, an import that comes from
freeswitch_interconnect.
Raises:
ValueError if there are errors
Returns:
True if everything checks out
"""
s = itsdangerous.JSONWebSignatureSerializer(self.conf['bts_secret'])
try:
data = s.loads(jwt_data)
except itsdangerous.BadSignature:
logger.emergency("Bad signature for request, ignoring.")
raise ValueError("Bad signature")
# Make sure the msg hasn't been seen before, if so, discard it.
if "msgid" in data:
if self.msgid_db.seen(str(data['msgid'])):
logger.error("Endaga: Repeat msgid: %s" % (data['msgid'],))
raise ValueError("Repeat msgid: %s" % (data['msgid'],))
else:
logger.error("Endaga: No message ID.")
raise ValueError("No message ID.")
return data
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()