python类user()的实例源码

utils.py 文件源码 项目:grest 作者: mostafa 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def authorize(func):
    @wraps(func)
    def authorize_requests(*args, **kwargs):
        """
        The authorization_function can be either empty, which
        results in all requests being taken as granted and authorized.
        Otherwise the authorization_function must return one of these values:
        1- False -> To indicate the user is not authorized
        2- g.is_authorized global boolean variable ->
            + True: access is granted.
            + False: access is denied.
        3- jsonified error message:
            + It is directly returned to user, e.g.:
                return jsonify(error="Access denied!"), 401
        """
        authorized = False
        if (global_config.DEBUG):
            app.ext_logger.info(
                request.endpoint.replace(":", "/").replace(".", "/").lower())

        # authorize users here!
        if hasattr(app, "authorization_function"):
            authorized = app.authorization_function(
                global_config.X_AUTH_TOKEN)
        else:
            return func(*args, **kwargs)

        if authorized is False:
            return jsonify(errors=["Access denied!"]), 401
        elif g.is_authorized is True:
            return func(*args, **kwargs)
        else:
            return authorized

    return authorize_requests
views.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def before_request():
    g.user = current_user
__init__.py 文件源码 项目:Cuneiform 作者: nervouna 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def before_request():
    g.user = Author.get_current()
app.py 文件源码 项目:Andy-Chat 作者: andriy-sa 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _before_reques():
    g.user = get_jwt_user()
authentification.py 文件源码 项目:Andy-Chat 作者: andriy-sa 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def get_jwt_user():
    try:
        token = jwt.request_callback()
        payload = jwt.jwt_decode_callback(token)
        user = jwt.identity_callback(payload)
    except InvalidTokenError:
        user = None
    except JWTError:
        user = None

    return user
authentification.py 文件源码 项目:Andy-Chat 作者: andriy-sa 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def login_required():
    def decorator(func):
        @wraps(func)
        def decorated_view(*args, **kwargs):
            if g.user is None or not g.user['is_active']:
                return jsonify({'message': "Forbidden"}), 403

            return func(*args, **kwargs)

        return decorated_view

    return decorator
users.py 文件源码 项目:Andy-Chat 作者: andriy-sa 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def user(id):
    user = User.find(id)
    if not user:
        return jsonify({}), 404

    return jsonify(user.serialize()), 200
chat.py 文件源码 项目:Andy-Chat 作者: andriy-sa 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def rooms():
    rooms = Room.select('rooms.*') \
        .add_select(
        db.raw(
            '(select m.created_at from messages as m where rooms.id = m.room_id order by m.created_at desc limit 1) as last_message_date')
    ) \
        .add_select(
        db.raw(
            'IF(rooms.is_group,false,(select CONCAT(u.first_name," ",u.last_name) from room_members as mem join users as u on u.id = mem.user_id where mem.room_id = rooms.id and u.id != %s limit 1)) as username' %
            g.user['id'])
    ) \
        .add_select(
        db.raw(
            'IF(rooms.is_group,false,(select u.avatar from room_members as mem join users as u on u.id = mem.user_id where mem.room_id = rooms.id and u.id != %s limit 1)) as avatar' %
            g.user['id'])
    ) \
        .add_select(
        db.raw(
            'IF(rooms.is_group,false,(select u.id from room_members as mem join users as u on u.id = mem.user_id where mem.room_id = rooms.id and u.id != %s limit 1)) as friend_id' %
            g.user['id'])
    ) \
        .add_select(
        db.raw(
            '(select count(m.id) from room_members as mem join messages as m on m.room_id = mem.room_id where mem.user_id = %s and mem.room_id = rooms.id and m.id > IF(mem.last_read_message,mem.last_read_message,0)) as unread_messages' %
            g.user['id'])
    ) \
        .join('room_members as rm', 'rm.room_id', '=', 'rooms.id') \
        .where('rm.user_id', g.user['id']) \
        .group_by('rooms.id') \
        .order_by('last_message_date', 'desc') \
        .get()\
        .serialize()

    for room in rooms:
        if not room['is_group']:
            client = _.findWhere(connected_users, {'id': room['friend_id']})
            room['online'] = True if client else False

    return jsonify(rooms), 200
chat.py 文件源码 项目:Andy-Chat 作者: andriy-sa 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def user_leave_room(room_id):
    member = RoomMember.select('room_members.id','room_members.user_id', 'r.user_id as owner_id') \
        .join('rooms as r', 'r.id', '=', 'room_members.room_id') \
        .where('room_members.room_id', room_id) \
        .where('room_members.user_id', g.user['id']) \
        .first()

    if not member:
        return jsonify({'message': "Unknown Room"}), 400

    if member.user_id == member.owner_id:
        Room.where('id', room_id).delete()
        socketio.emit('close_room', {'room_id': room_id}, room='room-%s' % room_id)
        close_room(room='room-%s' % room_id, namespace='/')

    else:
        member.delete()

        clients = _.where(connected_users, {'id': member.user_id})
        if clients and _.isList(clients):
            for item in clients:
                leave_room('room-%s' % room_id, sid=item['sid'], namespace='/')

        socketio.emit('update_members', {'room_id': room_id, 'detach': []}, room='room-%s' % room_id)

    return jsonify({'message': 'Success'}), 200
chat.py 文件源码 项目:Andy-Chat 作者: andriy-sa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def update_counter(room_id):
    data = request.get_json()
    if data and 'message_id' in data:
        try:
            RoomMember.where('user_id', g.user['id']).where('room_id', room_id).update(last_read_message=data['message_id'])
        except Exception:
            return jsonify({'message':'Bad Request'}), 400

    return jsonify({}), 200




# Socket events
auth.py 文件源码 项目:Andy-Chat 作者: andriy-sa 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def me():
    if g.user is None:
        return jsonify({'message': 'User does not exist'}), 401

    return jsonify(g.user), 200
decorators.py 文件源码 项目:Ranobe-Honyaku 作者: WeebWare 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def login_required(func):
    @wraps(func)
    def wrapped(*args, **kwargs):
        if g.user is None:
            return redirect(url_for("login"))
        return func(*args, **kwargs)
    return wrapped
decorators.py 文件源码 项目:Ranobe-Honyaku 作者: WeebWare 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def anon_only(func):
    @wraps(func)
    def wrapped(*args, **kwargs):
        if g.user is not None:
            return redirect(url_for("index"))
        return func(*args, **kwargs)
    return wrapped
auth.py 文件源码 项目:linkero 作者: ingran 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def verify_auth_token(token):
        s = Serializer(app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except SignatureExpired:
            return None  # valid token, but expired
        except BadSignature:
            return None  # invalid token
        user = User.query.get(data['id'])
        return user
auth.py 文件源码 项目:linkero 作者: ingran 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def verify_password(username_or_token, password):
    # first try to authenticate by token
    user = User.verify_auth_token(username_or_token)
    if not user:
        # try to authenticate with username/password
        user = User.query.filter_by(username=username_or_token).first()
        if not user or not user.verify_password(password):
            return False
    g.user = user
    return True
auth.py 文件源码 项目:linkero 作者: ingran 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def new_user():
    if pwd_context.verify(request.values.get('secret'), secret) == False:
        abort(401)    # unauthorized
    username = request.values.get('username')
    password = request.values.get('password')
    if username is None or password is None:
        abort(400)    # missing arguments
    if User.query.filter_by(username=username).first() is not None:
        abort(409)    # existing user
    user = User(username=username)
    user.hash_password(password)
    db.session.add(user)
    db.session.commit()
    return (jsonify({'username': user.username}), 201,
            {'Location': url_for('get_user', id=user.id, _external=True)})
auth.py 文件源码 项目:linkero 作者: ingran 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_user(id):
    user = User.query.get(id)
    if not user:
        abort(400)
    return jsonify({'username': user.username})
auth.py 文件源码 项目:linkero 作者: ingran 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_auth_token():
    token = g.user.generate_auth_token(tokenLife)
    return jsonify({'token': token.decode('ascii'), 'duration': tokenLife})
linkero.py 文件源码 项目:linkero 作者: ingran 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def verify_auth_token(token):
        s = Serializer(app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except SignatureExpired:
            return None  # valid token, but expired
        except BadSignature:
            return None  # invalid token
        user = User.query.get(data['id'])
        return user
linkero.py 文件源码 项目:linkero 作者: ingran 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def verify_password(username_or_token, password):
    # first try to authenticate by token
    user = User.verify_auth_token(username_or_token)
    if not user:
        # try to authenticate with username/password
        user = User.query.filter_by(username=username_or_token).first()
        if not user or not user.verify_password(password):
            return False
    g.user = user
    return True


问题


面经


文章

微信
公众号

扫码关注公众号