python类user()的实例源码

oauth2.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def usergetter(self, f):
        """Register a function as the user getter.
        This decorator is only required for **password credential**
        authorization::
            @oauth.usergetter
            def get_user(username, password, client, request,
                         *args, **kwargs):
                # client: current request client
                if not client.has_password_credential_permission:
                    return None
                user = User.get_user_by_username(username)
                if not user.validate_password(password):
                    return None
                # parameter `request` is an OAuthlib Request object.
                # maybe you will need it somewhere
                return user
        """
        self._usergetter = f
        return f
oauth2.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def tokengetter(self, f):
        """Register a function as the token getter.
        The function accepts an `access_token` or `refresh_token` parameters,
        and it returns a token object with at least these information:
            - access_token: A string token
            - refresh_token: A string token
            - client_id: ID of the client
            - scopes: A list of scopes
            - expires: A `datetime.datetime` object
            - user: The user object
        The implementation of tokengetter should accepts two parameters,
        one is access_token the other is refresh_token::
            @oauth.tokengetter
            def bearer_token(access_token=None, refresh_token=None):
                if access_token:
                    return get_token(access_token=access_token)
                if refresh_token:
                    return get_token(refresh_token=refresh_token)
                return None
        """
        self._tokengetter = f
        return f
oauth2.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def tokensetter(self, f):
        """Register a function to save the bearer token.
        The setter accepts two parameters at least, one is token,
        the other is request::
            @oauth.tokensetter
            def set_token(token, request, *args, **kwargs):
                save_token(token, request.client, request.user)
        The parameter token is a dict, that looks like::
            {
                u'access_token': u'6JwgO77PApxsFCU8Quz0pnL9s23016',
                u'token_type': u'Bearer',
                u'expires_in': 3600,
                u'scope': u'email address'
            }
        The request is an object, that contains an user object and a
        client object.
        """
        self._tokensetter = f
        return f
oauth2.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def validate_code(self, client_id, code, client, request, *args, **kwargs):
        """Ensure the grant code is valid."""
        client = client or self._clientgetter(client_id)
        log.debug(
            'Validate code for client %r and code %r', client.client_id, code
        )
        grant = self._grantgetter(client_id=client.client_id, code=code)
        if not grant:
            log.debug('Grant not found.')
            return False
        if hasattr(grant, 'expires') and \
           datetime.datetime.utcnow() > grant.expires:
            log.debug('Grant is expired.')
            return False

        request.state = kwargs.get('state')
        request.user = grant.user
        request.scopes = grant.scopes
        return True
oauth2.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def validate_refresh_token(self, refresh_token, client, request,
                               *args, **kwargs):
        """Ensure the token is valid and belongs to the client
        This method is used by the authorization code grant indirectly by
        issuing refresh tokens, resource owner password credentials grant
        (also indirectly) and the refresh token grant.
        """

        token = self._tokengetter(refresh_token=refresh_token)

        if token and token.client_id == client.client_id:
            # Make sure the request object contains user and client_id
            request.client_id = token.client_id
            request.user = token.user
            return True
        return False
oauth2.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def validate_user(self, username, password, client, request,
                      *args, **kwargs):
        """Ensure the username and password is valid.
        Attach user object on request for later using.
        """
        log.debug('Validating username %r and its password', username)
        if self._usergetter is not None:
            user = self._usergetter(
                username, password, client, request, *args, **kwargs
            )
            if user:
                request.user = user
                return True
            return False
        log.debug('Password credential authorization is disabled.')
        return False
decorators.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def authenticated(f):
    @wraps(f)
    def decorator(*args, **kwargs):
        token = (request.json or {}).get('auth') or \
            request.values.get('auth') or \
            request.cookies.get('auth')
        request.user = None
        request.auth = None
        if token is not None:
            user_id = get_cache(token)
            if user_id:
                try:
                    user = User.get(pk=user_id)
                except User.DoesNotExist:
                    pass
                else:
                    request.user = user
                    request.auth = token
        return f(*args, **kwargs)
    return decorator
decorators.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def with_game(f):
    @wraps(f)
    def decorator(token, *args, **kwargs):
        from game import Game

        try:
            game = Game.load_game(token)
        except errors.GameNotStartedError as exc:
            data = {
                'type': consts.TYPES[exc.type]['name'],
                'limit': exc.limit,
            }
            if (exc.token):
                data['invite'] = exc.token
            return send_data(data)
        except errors.GameNotFoundError as exc:
            return send_error(exc.message)
        if game._loaded_by == consts.WHITE:
            if game.model.player_white is not None and game.model.player_white != request.user:
                return send_error('wrong user')
        else:
            if game.model.player_black is not None and game.model.player_black != request.user:
                return send_error('wrong user')
        return f(game, *args, **kwargs)
    return decorator
auth.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def recover(token):

    @validated(RecoverValidator)
    def _post(user, data):
        user.set_password(data['password'])
        user.save()
        delete_cache(token)
        return send_message('password changed')

    user = User.get_by_token(token)
    if user:
        if request.method == 'GET':
            return send_success()
        elif request.method == 'POST':
            return _post(user)
    return send_error('token not found')
game.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def invited(token):
    try:
        enemy_token, game_type, game_limit = get_cache('invite_{}'.format(token))
    except:
        return send_error('game not found')
    enemy_user = None
    user_id = get_cache('user_{}'.format(enemy_token))
    if user_id:
        try:
            enemy_user = User.get(pk=user_id)
        except User.DoesNotExist:
# TODO: if user not found game will be created with None as white player
            pass
    user_token = generate_token(True)
    game = Game.new_game(
        enemy_token, user_token, game_type, game_limit,
        white_user=enemy_user, black_user=request.user
    )
    delete_cache('wait_{}'.format(enemy_token))
    result = {'game': user_token}
    result.update(game.get_info(consts.BLACK))
    return send_data(result)
game.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load_game(self, token):
        try:
            game = Game.load_game(token)
        except errors.GameNotStartedError as e:
            data = {
                'type': consts.TYPES[e.type]['name'],
                'limit': e.limit,
            }
            if (e.token):
                data['invite'] = e.token
            return data
        except errors.GameNotFoundError as e:
            raise errors.APIException(e.message)
        if game._loaded_by == consts.WHITE:
            if game.model.player_white is not None and game.model.player_white != request.user:
                raise errors.APIException('wrong user')
        else:
            if game.model.player_black is not None and game.model.player_black != request.user:
                raise errors.APIException('wrong user')
        self.game = game
game.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get(self):
        result = []
        count = 0
        for pool in GamePool.select().where(
            GamePool.is_started == False,
            GamePool.is_lost == False,
            GamePool.player1 is not None,
        ).order_by(GamePool.date_created.desc()):
            if pool.user1 and pool.user1 == request.user:
                continue
            result.append({
                'id': pool.pk,
                'date_created': pool.date_created.isoformat(),
                'user': pool.user1.username if pool.user1 else None,
                'type': consts.TYPES[pool.type_game]['name'],
                'limit': pool.time_limit,
            })
            count += 1
            if count > 9:
                break
        return {'games': result}
game.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post(self, game_id):
        try:
            pool = GamePool.get(GamePool.pk == game_id)
        except GamePool.DoesNotExist:
            raise errors.APINotFound('game')
        except Exception as e:
            raise errors.APIException('wrong format')
        if pool.user1 and pool.user1 == request.user:
            raise errors.APIException('you cannot start game with yourself')
        pool.player2 = generate_token(True)
        pool.user2 = request.user
        pool.is_started = True
        pool.save()
        game = Game.new_game(
            pool.player1, pool.player2, pool.type_game, pool.time_limit,
            white_user=pool.user1, black_user=pool.user2
        )
        delete_cache('wait_{}'.format(pool.player1))
        result = {'game': pool.player2}
        result.update(game.get_info(consts.BLACK))
        return result
game.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self, token):
        try:
            enemy_token, game_type, game_limit = get_cache('invite_{}'.format(token))
        except:
            raise errors.APINotFound('game')
        enemy_user = None
        user_id = get_cache('user_{}'.format(enemy_token))
        if user_id:
            try:
                enemy_user = User.get(pk=user_id)
            except User.DoesNotExist:
# TODO: if user not found game will be created with None as white player
                pass
        user_token = generate_token(True)
        game = Game.new_game(
            enemy_token, user_token, game_type, game_limit,
            white_user=enemy_user, black_user=request.user
        )
        delete_cache('wait_{}'.format(enemy_token))
        result = {'game': user_token}
        result.update(game.get_info(consts.BLACK))
        return result
views.py 文件源码 项目:maple-file 作者: honmaple 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self):
        '''
        ??????
        '''
        query_dict = request.data
        user = request.user
        page, number = self.page_info
        keys = ['name', 'description']
        order_by = gen_order_by(query_dict, keys)
        filter_dict = gen_filter_dict(query_dict, keys, user=user)
        albums = Album.query.filter_by(
            **filter_dict).order_by(*order_by).paginate(page, number)
        serializer = AlbumSerializer(albums.items, True)
        pageinfo = PageInfo(albums)
        return HTTPResponse(
            HTTPResponse.NORMAL_STATUS,
            data=serializer.data,
            pageinfo=pageinfo).to_response()
views.py 文件源码 项目:maple-file 作者: honmaple 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def post(self):
        '''
        ????
        '''
        post_data = request.data
        user = request.user
        name = post_data.pop('name', None)
        description = post_data.pop('description', None)
        if name is None:
            msg = '????????'
            return HTTPResponse(
                HTTPResponse.HTTP_CODE_PARA_ERROR, message=msg).to_response()
        album = Album(name=name, user=user)
        if description is not None:
            album.description = description
        album.save()
        serializer = AlbumSerializer(album)
        return HTTPResponse(
            HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
views.py 文件源码 项目:maple-file 作者: honmaple 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def put(self, pk):
        '''
        ????
        '''
        post_data = request.data
        user = request.user
        name = post_data.pop('name', None)
        description = post_data.pop('description', None)
        album = Album.query.filter_by(id=pk, user=user).first()
        if not album:
            msg = '?????'
            return HTTPResponse(
                HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response()
        if name is not None:
            album.name = name
        if description is not None:
            album.description = description
        album.save()
        serializer = AlbumSerializer(album)
        album.delete()
        return HTTPResponse(
            HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
views.py 文件源码 项目:maple-file 作者: honmaple 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def put(self, pk):
        '''
        ??????
        '''
        post_data = request.data
        user = request.user
        name = post_data.pop('name', None)
        description = post_data.pop('description', None)
        image = Image.query.filter_by(id=pk, user=user).first()
        if not image:
            msg = '?????'
            return HTTPResponse(
                HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response()
        if name is not None:
            image.name = name
            image.url = os.path.join(image.path, name)
        if description is not None:
            image.description = description
        image.save()
        serializer = ImageSerializer(image)
        return HTTPResponse(
            HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
views.py 文件源码 项目:maple-file 作者: honmaple 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def delete(self, pk):
        '''
        ????
        '''
        user = request.user
        image = Image.query.filter_by(id=pk, user=user).first()
        if not image:
            msg = '?????'
            return HTTPResponse(
                HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response()
        serializer = ImageSerializer(image)
        img_path = os.path.join(current_app.config['UPLOAD_FOLDER_ROOT'],
                                image.url)
        # ????
        if os.path.exists(img_path):
            os.remove(img_path)
        # ?????
        thumb_path = os.path.join(current_app.config['UPLOAD_FOLDER_ROOT'],
                                  image.url.replace('photo', 'thumb'))
        if os.path.exists(thumb_path):
            os.remove(thumb_path)
        image.delete()
        return HTTPResponse(
            HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
middleware.py 文件源码 项目:service-level-reporting 作者: zalando-zmon 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def process_request():
    """
    Process request.

    - Set api_url

    """
    base_url = request.base_url

    referrer = request.headers.get('referer')

    if referrer:
        # we use referrer as base url
        parts = urlparse(referrer)
        base_url = urlunparse((parts.scheme, parts.netloc, '', '', '', ''))
    elif APP_URL:
        base_url = APP_URL

    # Used in building full URIs
    request.api_url = urljoin(base_url, API_PREFIX + '/')
    request.user = flask_session.get('user')
    request.realm = flask_session.get('realm', 'employees')
app.py 文件源码 项目:docker-repo-auth-demo 作者: twosigma 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def notifications():
    if request.user != 'NOTIFICATION':
        return ('', 403)
    data = json.loads(request.data)
    if not 'events' in data:
        return ('', 204)
    for event in data['events']:
        logging.debug(event)
        # At this time the possible values for action seem to be
        # push, pull, delete, and mount:
        # https://github.com/docker/distribution/blob/master/notifications/event.go#L10
        action = event['action']
        repo = event['target']['repository']
        digest = event['target']['digest']
        # tag may not be present
        tag = event['target'].get('tag')
        timestamp = dateutil.parser.parse(event['timestamp'])
        user = event['actor']['name']
        logging.info("action '{}' repo '{}' digest '{}' tag '{}' timestamp '{}' user '{}'".format(action, repo, digest, tag, timestamp, user))
        # Save to database, etc. here
    return ('', 204)
utils.py 文件源码 项目:flux 作者: NiklasRosenstein 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def requires_auth(func):
  ''' Decorator for view functions that require basic authentication. '''

  from .models import Session, User

  @functools.wraps(func)
  def wrapper(*args, **kwargs):
    user_name = session.get('user_name')
    user_passhash = session.get('user_passhash')
    with Session() as db_session:
      user = User.get_by(db_session, user_name, user_passhash)
      if not user:
        return redirect(url_for('login'))

    request.user = user
    return func(*args, **kwargs)

  return wrapper
oauth2.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def after_request(self, f):
        """Register functions to be invoked after accessing the resource.
        The function accepts ``valid`` and ``request`` as parameters,
        and it should return a tuple of them::
            @oauth.after_request
            def valid_after_request(valid, oauth):
                if oauth.user in black_list:
                    return False, oauth
                return valid, oauth
        """
        self._after_request_funcs.append(f)
        return f
oauth2.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def grantsetter(self, f):
        """Register a function to save the grant code.
        The function accepts `client_id`, `code`, `request` and more::
            @oauth.grantsetter
            def set_grant(client_id, code, request, *args, **kwargs):
                save_grant(client_id, code, request.user, request.scopes)
        """
        self._grantsetter = f
        return f
oauth2.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def validate_grant_type(self, client_id, grant_type, client, request,
                            *args, **kwargs):
        """Ensure the client is authorized to use the grant type requested.
        It will allow any of the four grant types (`authorization_code`,
        `password`, `client_credentials`, `refresh_token`) by default.
        Implemented `allowed_grant_types` for client object to authorize
        the request.
        It is suggested that `allowed_grant_types` should contain at least
        `authorization_code` and `refresh_token`.
        """
        if self._usergetter is None and grant_type == 'password':
            log.debug('Password credential authorization is disabled.')
            return False

        default_grant_types = (
            'authorization_code', 'password',
            'client_credentials', 'refresh_token',
        )

        # Grant type is allowed if it is part of the 'allowed_grant_types'
        # of the selected client or if it is one of the default grant types
        if hasattr(client, 'allowed_grant_types'):
            if grant_type not in client.allowed_grant_types:
                return False
        else:
            if grant_type not in default_grant_types:
                return False

        if grant_type == 'client_credentials':
            if not hasattr(client, 'user'):
                log.debug('Client should have a user property')
                return False
            request.user = client.user

        return True
decorators.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def login_required(f):
    @wraps(f)
    def decorator(*args, **kwargs):
        if getattr(request, 'user', None):
            return f(*args, **kwargs)
        return send_error('not authorized')
    return decorator
chat.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def messages():

    @validated(MessageValidator)
    def _post(data):
        message = ChatMessage.create(user=request.user, text=data['text'])
        result = {'message': MessageSerializer(message).calc()}
        send_ws(result, consts.WS_CHAT_MESSAGE)
        return send_data(result)

    if request.method == 'GET':
        try:
            limit = int(request.args.get('limit', -1))
            offset = int(request.args.get('offset', -1))
            if limit < 0:
                limit = config.DEFAULT_COUNT_MESSAGES
            if offset < 0:
                offset = 0
        except Exception as e:
            log.error(e)
            return send_error('wrong arguments')
        messages = ChatMessage.select()\
                       .where(ChatMessage.chat == None)\
                       .order_by(-ChatMessage.date_created)\
                       .offset(offset)\
                       .limit(limit)
        return send_data({
            'messages': [MessageSerializer(m).calc() for m in messages],
        })
    elif request.method == 'POST':
        return _post()
auth.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def register(data):
    username = data['username']
    password = data['password']
    email = data['email']
    user = User.add(username, password, email)
    if email:
        token = user.get_verification()
        data = {
            'username': username,
            'url': urljoin(config.SITE_URL, config.VERIFY_URL),
            'token': token,
        }
        send_mail_template('registration', [email], data=data)
    return send_message('registration successful')
auth.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def verify(token):
    user = User.get_by_token(token)
    if user:
        user.verify()
        delete_cache(token)
        return send_success()
    return send_error('token not found')
auth.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def reset(data):
    try:
        user = User.get(User.email == data['email'])
    except User.DoesNotExist:
        return send_error('email not found')
    else:
        token = user.get_reset()
        data = {
            'username': user.username,
            'url': urljoin(config.SITE_URL, config.RECOVER_URL),
            'token': token,
        }
        send_mail_template('reset', [user.email], data=data)
    return send_success()


问题


面经


文章

微信
公众号

扫码关注公众号