def verify_access_token(access_token):
"""
?? Access_token
:param access_token: access_token
:return: ??????????,???? False
"""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(access_token)
except:
return False
if data.get('token_usage') != 'access':
return False
user = User(user_id=data.get('token_uid'))
if not user or not user.user_id or not user.is_active:
return False
return user
python类TimedJSONWebSignatureSerializer()的实例源码
def reset_password(self, token, new_pass):
"""Reset password. Token is generated by
:meth:`~User.generate_reset_token`
:param token:
:param new_pass:
:return:
"""
s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('user_id') == self.id:
self.password = new_pass
db.session.add(self)
db.session.commit()
return True
return False
def change_email(self, token):
"""Change email address using token.
"""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
db.session.add(self)
return True
def verify_auth_token(token):
"""Validate the token whether is night."""
serializer = Serializer(
current_app.config['SECRET_KEY'])
try:
# serializer object already has tokens in itself and wait for
# compare with token from HTTP Request /api/posts Method `POST`.
data = serializer.loads(token)
except SignatureExpired:
return None
except BadSignature:
return None
user = User.query.filter_by(id=data['id']).first()
return user
def confirm(self, token):
"""
????
:param token: ????? token
:return: ???? True,???? False
"""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('confirm_uid') != self.user_id:
return False
self.confirmed = 1
mongo.db.users.update_one({
'user_id': self.user_id
}, {
'$set': {
'confirmed': 1
}
})
return True
def refresh_access_token(refresh_token, expiration=3600):
"""
?? Access_token
:param refresh_token: refresh_token
:param expiration: ? Access_token ????
:return: ???????? Access_token ???,????False
"""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(refresh_token)
except:
return False
if data.get('token_usage') != 'refresh':
return False
if not data.get('token_uid'):
return False
sa = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
access_token = sa.dumps({'token_uid': data.get('token_uid'), 'token_usage': 'access'}).decode('ascii')
return dict(access_token=access_token, refresh_token=refresh_token, expires_in=expiration,
expires_at=int(time.time())+expiration, token_type='Bearer')
##
# ????
##
def change_email(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
self.avatar_hash = hashlib.md5(
self.email.encode('utf-8')).hexdigest()
db.session.add(self)
return True
def change_email(self,token):
s = Serializer(current_app.config['SECRET_KEY'],expiration)
try:
data = s.loads(token)
except:
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
self.avatar_hash = hashlib.md5(self.email.encode(utf-8)).hexdigest()
db.session.add(self)
return True
def confirm_user_account(token):
serializer = Serializer(current_app.config['SECRET_KEY'])
try:
data = serializer.loads(token)
except:
return False
user = user_repository.get_by_id(data.get('confirm'))
if user is None:
return False
user.confirmed = True
user_repository.save(user)
return True
def confirm(token):
s = Serializer(current_app.config["SECRET_KEY"])
try:
data = s.loads(token)
except:
flash("The confirmation link is invalid or has expired.", "danger")
return redirect(url_for("auth.unconfirmed"))
u = User.query.get(data.get("confirm"))
if u is None:
flash("The confirmation link is invalid or has expired.", "danger")
return redirect(url_for("auth.unconfirmed"))
if not u.confirm(token):
flash("The confirmation link is invalid or has expired.", "danger")
return redirect(url_for("auth.unconfirmed"))
# Confirmation complete!
# Login:
login_user(u)
# Tell them they are good:
flash("You have confirmed your account!", "success")
return redirect(url_for("main.index"))
def activate_account(self, token, name, password, username):
s = Serializer(current_app.config["SECRET_KEY"])
try:
data = s.loads(token)
except:
return False
if data.get("activation") != self.id:
return False
self.password = password
self.name = name
self.username = username
self.confirmed = True
self.active = True
db.session.add(self)
current_app.logger.info("User account activated: user id %s (%s)" %
(self.id, self.email))
self.track_event("activated_account")
return True
def confirm(self, token):
s = Serializer(current_app.config["SECRET_KEY"])
try:
data = s.loads(token)
except:
return False
if data.get("confirm") != self.id:
return False
self.confirmed = True
self.active = True
db.session.add(self)
db.session.commit()
current_app.logger.info("User account confirmed: user id %s (%s)" %
(self.id, self.email))
self.track_event("confirmed_account")
if data.get("trial") is True:
self.track_event("started_free_trial")
return True
def change_email(self, token):
s = Serializer(current_app.config["SECRET_KEY"])
try:
data = s.loads(token)
except:
return False
if data.get("change_email") != self.id:
return False
new_email = data.get("new_email")
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
try:
db.session.add(self)
db.session.commit()
except:
db.session.rollback()
raise Exception("Dirty session")
self.track_event("changed_email")
return True
def load_session_token(token):
"""Load cookie session"""
s = Serializer(current_app.config["SECRET_KEY"],
current_app.config.get("SESSION_EXPIRATION"))
try:
data = s.loads(token)
except:
return None
if SessionCache.validate_session(
data.get("user_id", -1), data.get("session_id", "-1")):
user = User.query.get(data["user_id"])
user.set_session_id(data["session_id"])
current_app.logger.debug("Loading user %s from cookie session %s" %
(user.id, user.session_id))
return user
return None
def get_auth_token(self):
"""Cookie info. Must be secure."""
s = Serializer(current_app.config["SECRET_KEY"],
current_app.config["COOKIE_EXPIRATION"])
current_app.logger.debug("Generating auth token for user %s" % self.id)
if not self.is_authenticated:
raise Exception("User not authenticated")
return s.dumps({
"user_id":
self.id,
"session_id":
SessionCache.create_session(
self.id, expiration=current_app.config["COOKIE_EXPIRATION"])
})
def change_email(self, token):
"""Verify the new email for this user."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
db.session.add(self)
db.session.commit()
return True
def change_email(self,token) :
s = Serializer(current_app.config['SECRET_KEY'])
try :
data = s.loads(token)
except :
return False
if data.get('change_email') != self.id :
return False
new_email = data.get('new_email')
if new_email is None :
return False
if self.query.filter_by(email=new_email).first() is not None :
return False
self.email = new_email
self.avatar_hash = hashlib.md5(self.email.encode('utf-8')).hexdigest()
db.session.add(self)
return True
def verify_auth_token(cls, token):
"""
Ensures that the token received from the client exists and returns the
User that the token belongs to. Returns None if token doesn't exist.
:param token: str
:return: User object or None
"""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return None
user = User.query.get(data['id'])
if user and user.session_token == token:
return user
return None
# DB Helpers
def change_email(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email = new_email).first() is not None:
return False
self.email = new_email
self.avatar_hash = hashlib.md5(self.email.encode('utf-8')).hexdigest()
db.session.add(self)
return True
def generate_token(username, password, expiration=600):
"""
Generate an authorized token
"""
doc = {'username':username, 'password_hash':pwd_context.encrypt(password)}
db.sessions.find_one_and_update(
{'username': username},
{"$set": doc},
upsert=True
)
if (cfg.ACME_PROD or cfg.ACME_DEV) and (username == 'serveruser'):
EXPIRES_IN_A_YEAR = 365 * 24 * 60 * 60
print 'token that EXPIRES_IN_A_YEAR'
s = TimedJWSSerializer(app.config['SECRET_KEY'], expires_in=EXPIRES_IN_A_YEAR)
else:
print 'token that expires', cfg.ACME_LCL
s = TimedJWSSerializer(app.config['SECRET_KEY'], expires_in=expiration)
return s.dumps({'username': username, 'password': password})
def verify_token(username, token):
"""
Verify validity of token
"""
s = TimedJWSSerializer(app.config['SECRET_KEY'])
try:
ut.pretty_print("Trying to load the token")
data = s.loads(token)
except SignatureExpired:
ut.pretty_print("ERROR: Expired Token")
return False
except BadSignature:
ut.pretty_print("ERROR: Invalid Token")
return False
else:
ut.pretty_print("Token successfully loaded")
stored = db.sessions.find_one(filter={'username': data['username']}, sort=[('_id',-1)])
if not stored:
return False
result = json_util.loads(json_util.dumps(stored))
return pwd_context.verify(data['password'], result['password_hash']) and data['username'] == username
def change_email(self, token):
"""????"""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
return operate_model.db_add(self)
def change_email(self, token):
"""Verify the new email for this user."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
db.session.add(self)
db.session.commit()
return True
def generate_auth_token(self, expiration = 3600):
s = Serializer(app.config['SECRET_KEY'], expires_in = expiration)
str = s.dumps({'id': self.id})
return b64encode(str).decode('utf-8')
def verify_auth_token(token):
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(b64decode(token))
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
def generate_confirmation_token(self, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'confirm': self.id})
def confirm(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('confirm') != self.id:
return False
self.confirmed = True
db.session.add(self)
return True
def generate_reset_token(self, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'reset': self.id})
def reset_password(self, token, new_password):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('reset') != self.id:
return False
self.password = new_password
db.session.add(self)
return True
def generate_email_change_token(self, new_email, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'change_email': self.id, 'new_email': new_email})