def verify_password(email_or_token, password):
if email_or_token == '':
g.current_user = AnonymousUser()
return True
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
python类current_user()的实例源码
def before_request():
if not g.current_user.is_anonymous and \
not g.current_user.confirmed:
return forbidden('Unconfirmed account')
def before_pre_request():
if request.path in ['/auth/login', '/oauth2/welcome']:
return
token = request.headers.get('TOKEN')
if not token:
return jsonify('Authorization error'), 403
gl = gitlab.Gitlab(
'http://gitlab.onenet.com', oauth_token=token, api_version='4')
gl.auth()
g.current_user = gl.user
g.gl = gl
def get(self):
return g.current_user.attributes
def login_required(f) :
@wraps(f)
def decorated(*args,**kwargs) :
token = request.headers.get('token')
if token is not None :
g.current_user = User.verify_auth_token(token)
return f(*args,**kwargs)
return jsonify("login first!") , 401
return decorated
def verify_password(email_or_token, password):
if email_or_token == '':
g.current_user = AnonymousUser()
return True
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
def forbidden_error():
return forbidden('unconfirmed account')
# uncomment to apply auth.login_required for each view in the blueprint #
#@webhook.before_request
#@auth.login_required
#def before_request():
# if not g.current_user.is_anonymous and \
# not g.current_user.confirmed:
# return forbidden('Unconfirmed account')
def get_token():
if g.current_user.is_anonymous or g.token_used:
return unauthorized('Invalid credentials')
return jsonify({'token': g.current_user.generate_auth_token(
expiration=3600), 'expiration': 3600})
def new_post():
post = Post.from_json(request.json)
post.author = g.current_user
db.session.add(post)
db.session.commit()
return jsonify(post.to_json()), 201, \
{'Location': url_for('api.get_post', id=post.id, _external=True)}
def edit_post(id):
post = Post.query.get_or_404(id)
if g.current_user != post.author and \
not g.current_user.can(Permission.ADMINISTER):
return forbidden('Insufficient permissions')
post.body = request.json.get('body', post.body)
db.session.add(post)
return jsonify(post.to_json())
def new_post_comment(id):
post = Post.query.get_or_404(id)
comment = Comment.from_json(request.json)
comment.author = g.current_user
comment.post = post
db.session.add(comment)
db.session.commit()
return jsonify(comment.to_json()), 201, \
{'Location': url_for('api.get_comment', id=comment.id,
_external=True)}
def verify_password(email_or_token, password):
if email_or_token == '':
g.current_user = AnonymousUser()
return True
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
def before_request():
if not g.current_user.is_anonymous and \
not g.current_user.confirmed:
return forbidden('Unconfirmed account')
def new_post():
post = Post.from_json(request.json)
post.author = g.current_user
db.session.add(post)
db.session.commit()
return jsonify(post.to_json()), 201, \
{'Location': url_for('api.get_post', id=post.id)}
def edit_post(id):
post = Post.query.get_or_404(id)
if g.current_user != post.author and \
not g.current_user.can(Permission.ADMIN):
return forbidden('Insufficient permissions')
post.body = request.json.get('body', post.body)
db.session.add(post)
db.session.commit()
return jsonify(post.to_json())
def new_post_comment(id):
post = Post.query.get_or_404(id)
comment = Comment.from_json(request.json)
comment.author = g.current_user
comment.post = post
db.session.add(comment)
db.session.commit()
return jsonify(comment.to_json()), 201, \
{'Location': url_for('api.get_comment', id=comment.id)}
def verify_password(email_or_token, password):
if email_or_token == '':
return False
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
def before_request():
if not g.current_user.is_anonymous and \
not g.current_user.confirmed:
return forbidden('Unconfirmed account')
def verify_token(token):
if token in TOKENS:
g.current_user = token
return True
return False
# ?????????