def authorize(func):
@wraps(func)
def authorize_requests(*args, **kwargs):
"""
The authorization_function can be either empty, which
results in all requests being taken as granted and authorized.
Otherwise the authorization_function must return one of these values:
1- False -> To indicate the user is not authorized
2- g.is_authorized global boolean variable ->
+ True: access is granted.
+ False: access is denied.
3- jsonified error message:
+ It is directly returned to user, e.g.:
return jsonify(error="Access denied!"), 401
"""
authorized = False
if (global_config.DEBUG):
app.ext_logger.info(
request.endpoint.replace(":", "/").replace(".", "/").lower())
# authorize users here!
if hasattr(app, "authorization_function"):
authorized = app.authorization_function(
global_config.X_AUTH_TOKEN)
else:
return func(*args, **kwargs)
if authorized is False:
return jsonify(errors=["Access denied!"]), 401
elif g.is_authorized is True:
return func(*args, **kwargs)
else:
return authorized
return authorize_requests
python类user()的实例源码
def before_request():
g.user = current_user
def before_request():
g.user = Author.get_current()
def _before_reques():
g.user = get_jwt_user()
def get_jwt_user():
try:
token = jwt.request_callback()
payload = jwt.jwt_decode_callback(token)
user = jwt.identity_callback(payload)
except InvalidTokenError:
user = None
except JWTError:
user = None
return user
def login_required():
def decorator(func):
@wraps(func)
def decorated_view(*args, **kwargs):
if g.user is None or not g.user['is_active']:
return jsonify({'message': "Forbidden"}), 403
return func(*args, **kwargs)
return decorated_view
return decorator
def user(id):
user = User.find(id)
if not user:
return jsonify({}), 404
return jsonify(user.serialize()), 200
def rooms():
rooms = Room.select('rooms.*') \
.add_select(
db.raw(
'(select m.created_at from messages as m where rooms.id = m.room_id order by m.created_at desc limit 1) as last_message_date')
) \
.add_select(
db.raw(
'IF(rooms.is_group,false,(select CONCAT(u.first_name," ",u.last_name) from room_members as mem join users as u on u.id = mem.user_id where mem.room_id = rooms.id and u.id != %s limit 1)) as username' %
g.user['id'])
) \
.add_select(
db.raw(
'IF(rooms.is_group,false,(select u.avatar from room_members as mem join users as u on u.id = mem.user_id where mem.room_id = rooms.id and u.id != %s limit 1)) as avatar' %
g.user['id'])
) \
.add_select(
db.raw(
'IF(rooms.is_group,false,(select u.id from room_members as mem join users as u on u.id = mem.user_id where mem.room_id = rooms.id and u.id != %s limit 1)) as friend_id' %
g.user['id'])
) \
.add_select(
db.raw(
'(select count(m.id) from room_members as mem join messages as m on m.room_id = mem.room_id where mem.user_id = %s and mem.room_id = rooms.id and m.id > IF(mem.last_read_message,mem.last_read_message,0)) as unread_messages' %
g.user['id'])
) \
.join('room_members as rm', 'rm.room_id', '=', 'rooms.id') \
.where('rm.user_id', g.user['id']) \
.group_by('rooms.id') \
.order_by('last_message_date', 'desc') \
.get()\
.serialize()
for room in rooms:
if not room['is_group']:
client = _.findWhere(connected_users, {'id': room['friend_id']})
room['online'] = True if client else False
return jsonify(rooms), 200
def user_leave_room(room_id):
member = RoomMember.select('room_members.id','room_members.user_id', 'r.user_id as owner_id') \
.join('rooms as r', 'r.id', '=', 'room_members.room_id') \
.where('room_members.room_id', room_id) \
.where('room_members.user_id', g.user['id']) \
.first()
if not member:
return jsonify({'message': "Unknown Room"}), 400
if member.user_id == member.owner_id:
Room.where('id', room_id).delete()
socketio.emit('close_room', {'room_id': room_id}, room='room-%s' % room_id)
close_room(room='room-%s' % room_id, namespace='/')
else:
member.delete()
clients = _.where(connected_users, {'id': member.user_id})
if clients and _.isList(clients):
for item in clients:
leave_room('room-%s' % room_id, sid=item['sid'], namespace='/')
socketio.emit('update_members', {'room_id': room_id, 'detach': []}, room='room-%s' % room_id)
return jsonify({'message': 'Success'}), 200
def update_counter(room_id):
data = request.get_json()
if data and 'message_id' in data:
try:
RoomMember.where('user_id', g.user['id']).where('room_id', room_id).update(last_read_message=data['message_id'])
except Exception:
return jsonify({'message':'Bad Request'}), 400
return jsonify({}), 200
# Socket events
def me():
if g.user is None:
return jsonify({'message': 'User does not exist'}), 401
return jsonify(g.user), 200
def login_required(func):
@wraps(func)
def wrapped(*args, **kwargs):
if g.user is None:
return redirect(url_for("login"))
return func(*args, **kwargs)
return wrapped
def anon_only(func):
@wraps(func)
def wrapped(*args, **kwargs):
if g.user is not None:
return redirect(url_for("index"))
return func(*args, **kwargs)
return wrapped
def verify_auth_token(token):
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(token)
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
def verify_password(username_or_token, password):
# first try to authenticate by token
user = User.verify_auth_token(username_or_token)
if not user:
# try to authenticate with username/password
user = User.query.filter_by(username=username_or_token).first()
if not user or not user.verify_password(password):
return False
g.user = user
return True
def new_user():
if pwd_context.verify(request.values.get('secret'), secret) == False:
abort(401) # unauthorized
username = request.values.get('username')
password = request.values.get('password')
if username is None or password is None:
abort(400) # missing arguments
if User.query.filter_by(username=username).first() is not None:
abort(409) # existing user
user = User(username=username)
user.hash_password(password)
db.session.add(user)
db.session.commit()
return (jsonify({'username': user.username}), 201,
{'Location': url_for('get_user', id=user.id, _external=True)})
def get_user(id):
user = User.query.get(id)
if not user:
abort(400)
return jsonify({'username': user.username})
def get_auth_token():
token = g.user.generate_auth_token(tokenLife)
return jsonify({'token': token.decode('ascii'), 'duration': tokenLife})
def verify_auth_token(token):
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(token)
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
def verify_password(username_or_token, password):
# first try to authenticate by token
user = User.verify_auth_token(username_or_token)
if not user:
# try to authenticate with username/password
user = User.query.filter_by(username=username_or_token).first()
if not user or not user.verify_password(password):
return False
g.user = user
return True