def put(self):
"""Change the password"""
us = User.query \
.filter(User.disabled == 0) \
.filter(User.id_user == g.current_user) \
.first()
abort_if_none(us, 404, 'User not found')
if not check_password_hash(us.password, request.json['old_password']):
return msg('Old password incorrect'), 403
us.password = request.json['password']
db.session.commit()
cache.blacklisted_tokens.append(request.headers['Authorization'])
return msg('success!')
python类current_user()的实例源码
def check_auth():
session = None
user = None
token = request.headers.get('X-Auth-Token')
if token:
session = Session.query.filter_by(token=token).first()
if not session:
return make_error_response('Invalid session token', 401)
user = session.user
else:
auth = request.authorization
if auth:
user = User.find_by_email_or_username(auth.username)
if not (user and user.password == auth.password):
return make_error_response('Invalid username/password combination', 401)
g.current_session = session
g.current_user = user
def verify_password(email_or_token, password):
"""Verify user using email and address or token,
otherwise, set as anonymous user.
"""
if email_or_token == '':
g.current_user = AnonymousUser()
return True
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
def verify_password(username, password):
authorization = request.headers.get('Authorization', '').split(' ')
bearer = authorization[1] if len(authorization) > 1 else ''
if bearer:
g.current_user = User.verify_access_token(bearer)
g.token_used = True
return g.current_user is not None
if username == '':
g.current_user = AnonymousUser()
return True
user = User(username=username)
if not user or not user.user_id:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
def verify_token(token, add_to_session=False):
"""Token verification callback."""
if add_to_session:
# clear the session in case auth fails
if 'nickname' in session:
del session['nickname']
user = User.query.filter_by(token=token).first()
if user is None:
return False
if user.ping():
from .events import push_model
push_model(user)
db.session.add(user)
db.session.commit()
g.current_user = user
if add_to_session:
session['nickname'] = user.nickname
return True
def delete(self, org_id, location_id, role_id, user_id, timeclock_id):
"""
deletes a timeclock record
"""
timeclock = Timeclock.query.get_or_404(timeclock_id)
user = User.query.get_or_404(user_id)
original_start = timeclock.start
original_stop = timeclock.stop
try:
db.session.delete(timeclock)
db.session.commit()
except Exception as exception:
db.session.rollback()
current_app.logger.error(str(exception))
abort(400)
if timeclock.user_id != g.current_user.id:
alert_timeclock_change(None, org_id, location_id, role_id,
original_start, original_stop, user,
g.current_user)
g.current_user.track_event("timeclock_deleted")
return {}, 204
def login():
username = request.json.get('username')
password = request.json.get('password')
if username and password:
user = User.find_by_identity(username)
if user and user.authenticated(password):
g.current_user = user
session_token = user.generate_auth_token(3600)
user.session_token = session_token
db.session.commit()
response = user.to_json()
return jsonify({'response': response}), 200
else:
return jsonify({'response':
{'message': 'Username or password is wrong'}}), 404
return jsonify({'response':
{'message': 'Password and username not provided'}}), 500
def get_blender_id_oauth_token() -> str:
"""Returns the Blender ID auth token, or an empty string if there is none."""
from flask import request
token = session.get('blender_id_oauth_token')
if token:
if isinstance(token, (tuple, list)):
# In a past version of Pillar we accidentally stored tuples in the session.
# Such sessions should be actively fixed.
# TODO(anyone, after 2017-12-01): refactor this if-block so that it just converts
# the token value to a string and use that instead.
token = token[0]
session['blender_id_oauth_token'] = token
return token
if request.authorization and request.authorization.username:
return request.authorization.username
if current_user.is_authenticated and current_user.id:
return current_user.id
return ''
def verify_password(email_or_token, password):
if email_or_token == '':
g.current_user = AnonymousUser()
return True
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
# 401
def verify_password(email_or_token,password):
if email_or_token == '':
g.current_user = AnonymousUser()
return True
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
def verify_token():
"""
Verify if the token is valid, not expired and not blacklisted
"""
if 'Authorization' in request.headers:
if request.headers['Authorization'] in cache.blacklisted_tokens:
abort(403, 'Error: invalid token')
try:
payload = jwt.decode(request.headers['Authorization'], config.SECRET_KEY)
g.current_user = payload['id_user']
except jwt.ExpiredSignatureError:
abort(403, 'Error: token expired')
except jwt.DecodeError:
abort(403, 'Error: invalid token')
def get_user_sessions():
return g.current_user.sessions
def delete_user_sessions():
g.current_user.sessions.delete()
db.session.commit()
return ('', 204)
def delete_session(id):
session = Session.query.get(id)
if not (session and session.user == g.current_user):
return make_error_response('Session not found', 404)
db.session.delete(session)
db.session.commit()
return ('', 204)
def update_user(data):
user = g.current_user
if data['password']:
user.change_password(data['password'])
db.session.commit()
return user
def inject_context(context):
ctx = {}
if context:
ctx.update(context)
ctx['current_user'] = g.current_user
return ctx
def new_post():
post = Post.from_json(request.json)
post.author = g.current_user
db.session.add(post)
db.session.commit()
return jsonify(post.to_json()), 201, \
{'Location': url_for('api.get_post', id=post.id, _external=True)}
def edit_post(id):
post = Post.query.get_or_404(id)
if g.current_user != post.author and \
not g.current_user.can(Permission.ADMINISTER):
return forbidden('Insufficient permissions')
post.body = request.json.get('body', post.body)
db.session.add(post)
return jsonify(post.to_json())
def new_post_comment(id):
post = Post.query.get_or_404(id)
comment = Comment.from_json(request.json)
comment.author = g.current_user
comment.post = post
db.session.add(comment)
db.session.commit()
return jsonify(comment.to_json()), 201, \
{'Location': url_for('api.get_comment', id=comment.id,
_external=True)}
authentication.py 文件源码
项目:circleci-demo-python-flask
作者: CircleCI-Public
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def verify_password(email_or_token, password):
if email_or_token == '':
g.current_user = AnonymousUser()
return True
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
authentication.py 文件源码
项目:circleci-demo-python-flask
作者: CircleCI-Public
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def before_request():
if not g.current_user.is_anonymous and \
not g.current_user.confirmed:
return forbidden('Unconfirmed account')
def permission_required(permission):
"""Decorator for specified permission verification.
"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not g.current_user.can(permission):
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
def new_post():
post = Post.from_json(request.json)
post.author = g.current_user
db.session.add(post)
db.session.commit()
return jsonify(post.to_json()), 201, {'Location': url_for('api.get_post',id=post.id, _external=True)}
# put??
def edit_post(id):
post = Post.query.get_or_404(id)
if g.current_user != post.author and \
not g.current_user.operation(Permission.ADMINISTER):
return forbidden('Insufficient permissions')
post.title = request.json.get('title', post.title)
post.body = request.json.get('body', post.body)
db.session.add(post)
return jsonify(post.to_json())
def new_post_comment(id):
post = Post.query.get_or_404(id)
comment = Comment.from_json(request.json)
comment.author = g.current_user
comment.post = post
db.session.add(comment)
db.session.commit()
return jsonify(comment.to_json()), 201, \
{'Location': url_for('api.get_comment', id=comment.id,_external=True)}
def verify_password(email_or_token, password):
if email_or_token == '':
g.current_user = AnonymousUser()
return True
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
def before_request():
if not g.current_user.is_authenticated:
return forbidden('Unconfirmed account')
def new_post():
post = Post.from_json(request.json)
post.author = g.current_user
db.session.add(post)
db.session.commit()
return jsonify(post.to_json()), 201, \
{'Location': url_for('api.get_post', id=post.id, _external=True)}
def edit_post(id):
post = Post.query.get_or_404(id)
if g.current_user != post.author and \
not g.current_user.can(Permission.ADMINISTER):
return forbidden('Insufficient permissions')
post.body = request.json.get('body', post.body)
db.session.add(post)
return jsonify(post.to_json())
def new_post_comment(id):
post = Post.query.get_or_404(id)
comment = Comment.from_json(request.json)
comment.author = g.current_user
comment.post = post
db.session.add(comment)
db.session.commit()
return jsonify(comment.to_json()), 201, \
{'Location': url_for('api.get_comment', id=comment.id,
_external=True)}