python类HTTPForbidden()的实例源码

middleware.py 文件源码 项目:aioviber 作者: nonamenix 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def signature_middleware(app, handler):
    """Check request signature"""

    async def middleware_handler(request: Request):
        if request.method == 'POST' and app.bot.check_signature:
            sig = request.query.get('sig')
            body = await request.read()
            if verify_signature(body, sig, app.bot.auth_token):
                response = await handler(request)
            else:
                logger.warning('Post requests with bad signature {sig} {body}'.format(
                    sig=sig, body=body
                ))
                response = web.HTTPForbidden()
        else:
            response = await handler(request)

        return response

    return middleware_handler
api.py 文件源码 项目:hangoutsbot 作者: das7pad 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def adapter_do_OPTIONS(self, request):
        origin = request.headers["Origin"]

        allowed_origins = self._bot.config.get_option("api_origins")
        if allowed_origins is None:
            raise web.HTTPForbidden()

        if "*" == allowed_origins or "*" in allowed_origins:
            return web.Response(headers={
                "Access-Control-Allow-Origin": origin,
                "Access-Control-Allow-Headers": "content-type",
            })

        if not origin in allowed_origins:
            raise web.HTTPForbidden()

        return web.Response(headers={
            "Access-Control-Allow-Origin": origin,
            "Access-Control-Allow-Headers": "content-type",
            "Vary": "Origin",
        })
areas.py 文件源码 项目:aiohttp-three-template 作者: RobertoPrevato 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __call__(self, f):
        """
        Applies area initialization logic to a request handling function, for example by loading user session.

        :param f: the request handler to be decorated.
        :return: a wrapped request handler that loads user information.
        """
        @wraps(f)
        async def wrapped(request):
            # set the area property inside the request object
            request.area = self.name
            try:
                await self.before_request(request)
            except InvalidCultureException:
                # redirect to a proper url
                return HTTPFound(self.get_fallback_url(request))
            except InvalidAntiforgeryTokenException:
                raise HTTPForbidden()

            return await f(request)
        return wrapped
__init__.py 文件源码 项目:aioweb 作者: kreopt 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def pre_dispatch(request, controller, actionName):
    reason = None

    check_ok = True
    if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):

        action = getattr(controller, actionName)

        if not getattr(action, 'csrf_disabled', False):
            check_ok = False
            token = request.headers.get(CSRF_HEADER_NAME)
            if not token:
                data = await request.post()
                token = data.get(CSRF_FIELD_NAME)

            if token:
                if validate_token(token, await get_secret(request)):
                    check_ok = True
                else:
                    reason = REASON_BAD_TOKEN
            else:
                reason = REASON_NO_CSRF_COOKIE

    if not check_ok:
        raise web.HTTPForbidden(reason=reason)
views.py 文件源码 项目:morpheus 作者: tutorcruncher 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def call(self, request):
        try:
            event_data = (await request.post())['mandrill_events']
        except KeyError:
            raise HTTPBadRequest(text='"mandrill_events" not found in post data')

        sig_generated = base64.b64encode(
            hmac.new(
                self.app['webhook_auth_key'],
                msg=(self.app['mandrill_webhook_url'] + 'mandrill_events' + event_data).encode(),
                digestmod=hashlib.sha1
            ).digest()
        )
        sig_given = request.headers.get('X-Mandrill-Signature', '<missing>').encode()
        if not hmac.compare_digest(sig_generated, sig_given):
            raise HTTPForbidden(text='invalid signature')
        try:
            events = ujson.loads(event_data)
        except ValueError as e:
            raise HTTPBadRequest(text=f'invalid json data: {e}')

        await self.sender.update_mandrill_webhooks(events)
        return Response(text='message status updated\n')
conftest.py 文件源码 项目:morpheus 作者: tutorcruncher 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def messagebird_pricing(request):
    if not request.query.get('username') == 'mb-username':
        raise HTTPForbidden(text='bad username')
    if not request.query.get('password') == 'mb-password':
        raise HTTPForbidden(text='bad password')
    return json_response([
        {
            'mcc': '0',
            'country_name': 'Default rate',
            'rate': '0.0400',
        },
        {
            'mcc': '234',
            'country_name': 'United Kingdom',
            'rate': '0.0200',
        },
    ])
example.py 文件源码 项目:aiohttp-transmute 作者: toumorokoshi 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def error(request):
    raise HTTPForbidden(reason="unauthorized")
util.py 文件源码 项目:django-redis-pubsub 作者: andrewyoung1991 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle_auth(token):
    """ handles retrieving a user from a token
    """
    if token is None:
        raise HTTPForbidden(body=b"no token in request")

    user = authentication_method(token)
    if user is None:
        raise HTTPForbidden(body=b"invalid token")
    return user
test_views.py 文件源码 项目:PollBot 作者: mozilla 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def cli(loop, test_client):
    async def error403(request):
        raise web.HTTPForbidden()

    async def error404(request):
        return web.HTTPNotFound()

    async def error(request):
        raise ValueError()

    app = get_app(loop=loop)
    app.router.add_get('/error', error)
    app.router.add_get('/error-403', error403)
    app.router.add_get('/error-404', error404)
    return loop.run_until_complete(test_client(app))
decorators.py 文件源码 项目:aiohttp_auth 作者: gnarlychicken 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def auth_required(func):
    """Utility decorator that checks if a user has been authenticated for this
    request.

    Allows views to be decorated like:

        @auth_required
        def view_func(request):
            pass

    providing a simple means to ensure that whoever is calling the function has
    the correct authentication details.

    Args:
        func: Function object being decorated and raises HTTPForbidden if not

    Returns:
        A function object that will raise web.HTTPForbidden() if the passed
        request does not have the correct permissions to access the view.
    """
    @wraps(func)
    async def wrapper(*args):
        if (await get_auth(args[-1])) is None:
            raise web.HTTPForbidden()

        return await func(*args)

    return wrapper
decorators.py 文件源码 项目:aiohttp_auth 作者: gnarlychicken 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def acl_required(permission, context):
    """Returns a decorator that checks if a user has the requested permission
    from the passed acl context.

    This function constructs a decorator that can be used to check a aiohttp's
    view for authorization before calling it. It uses the get_permission()
    function to check the request against the passed permission and context. If
    the user does not have the correct permission to run this function, it
    raises HTTPForbidden.

    Args:
        permission: The specific permission requested.
        context: Either a sequence of ACL tuples, or a callable that returns a
            sequence of ACL tuples. For more information on ACL tuples, see
            get_permission()

    Returns:
        A decorator which will check the request passed has the permission for
        the given context. The decorator will raise HTTPForbidden if the user
        does not have the correct permissions to access the view.
    """

    def decorator(func):

        @wraps(func)
        async def wrapper(*args):
            request = args[-1]

            if callable(context):
                context = context()

            if await get_permitted(request, permission, context):
                return await func(*args)

            raise web.HTTPForbidden()

        return wrapper

    return decorator
decorators.py 文件源码 项目:aiohttp-login 作者: imbolc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def admin_required(handler):
    @wraps(handler)
    async def decorator(*args):
        request = _get_request(args)
        response = await login_required(handler)(request)
        if request['user']['email'] not in cfg.ADMIN_EMAILS:
            raise HTTPForbidden(reason='You are not admin')
        return response
    return decorator
server_handler.py 文件源码 项目:PyGNS3 作者: mvdwoord 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def shutdown(request, response):

        config = Config.instance()
        if config.get_section_config("Server").getboolean("local", False) is False:
            raise HTTPForbidden(text="You can only stop a local server")

        log.info("Start shutting down the server")

        # close all the projects first
        controller = Controller.instance()
        projects = controller.projects.values()

        tasks = []
        for project in projects:
            tasks.append(asyncio.async(project.close()))

        if tasks:
            done, _ = yield from asyncio.wait(tasks)
            for future in done:
                try:
                    future.result()
                except Exception as e:
                    log.error("Could not close project {}".format(e), exc_info=1)
                    continue

        # then shutdown the server itself
        from gns3server.web.web_server import WebServer
        server = WebServer.instance()
        asyncio.async(server.shutdown_server())
        response.set_status(201)
server_handler.py 文件源码 项目:PyGNS3 作者: mvdwoord 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def debug(request, response):

        config = Config.instance()
        if config.get_section_config("Server").getboolean("local", False) is False:
            raise HTTPForbidden(text="You can only debug a local server")

        debug_dir = os.path.join(config.config_dir, "debug")
        try:
            if os.path.exists(debug_dir):
                shutil.rmtree(debug_dir)
            os.makedirs(debug_dir)
            with open(os.path.join(debug_dir, "controller.txt"), "w+") as f:
                f.write(ServerHandler._getDebugData())
        except Exception as e:
            # If something is wrong we log the info to the log and we hope the log will be include correctly to the debug export
            log.error("Could not export debug informations {}".format(e), exc_info=1)

        try:
            if Controller.instance().gns3vm.engine == "vmware":
                vmx_path = Controller.instance().gns3vm.current_engine().vmx_path
                if vmx_path:
                    shutil.copy(vmx_path, os.path.join(debug_dir, os.path.basename(vmx_path)))
        except OSError as e:
            # If something is wrong we log the info to the log and we hope the log will be include correctly to the debug export
            log.error("Could not copy VMware VMX file {}".format(e), exc_info=1)

        for compute in list(Controller.instance().computes.values()):
            try:
                r = yield from compute.get("/debug", raw=True)
                data = r.body.decode("utf-8")
            except Exception as e:
                data = str(e)
            with open(os.path.join(debug_dir, "compute_{}.txt".format(compute.name)), "w+") as f:
                f.write("Compute ID: {}\n".format(compute.id))
                f.write(data)

        response.set_status(201)
http.py 文件源码 项目:tomodachi 作者: kalaspuff 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def static_request_handler(cls: Any, obj: Any, context: Dict, func: Any, path: str, base_url: str) -> Any:
        if '?P<filename>' not in base_url:
            pattern = r'^{}(?P<filename>.+?)$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url)))
        else:
            pattern = r'^{}$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url)))
        compiled_pattern = re.compile(pattern)

        if path.startswith('/'):
            path = os.path.dirname(path)
        else:
            path = '{}/{}'.format(os.path.dirname(context.get('context', {}).get('_service_file_path')), path)

        if not path.endswith('/'):
            path = '{}/'.format(path)

        async def handler(request: web.Request) -> web.Response:
            result = compiled_pattern.match(request.path)
            filename = result.groupdict()['filename']
            filepath = '{}{}'.format(path, filename)

            try:
                if os.path.isdir(filepath) or not os.path.exists(filepath):
                    raise web.HTTPNotFound()

                pathlib.Path(filepath).open('r')
                return FileResponse(filepath)
            except PermissionError as e:
                raise web.HTTPForbidden()

        context['_http_routes'] = context.get('_http_routes', [])
        context['_http_routes'].append(('GET', pattern, handler))

        start_func = cls.start_server(obj, context)
        return (await start_func) if start_func else None
views.py 文件源码 项目:aiohttp_test_chat 作者: steelkiwi 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self, **kw):
        session = await get_session(self.request)
        if session.get('user'):
            del session['user']
            redirect(self.request, 'login')
        else:
            raise web.HTTPForbidden(body=b'Forbidden')
gallery.py 文件源码 项目:gallery-cms 作者: crccheck 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def login(request):
    session = await get_session(request)

    client = GoogleClient(
        client_id=os.getenv('OAUTH_CLIENT_ID'),
        client_secret=os.getenv('OAUTH_CLIENT_SECRET'),
        scope='email profile',
    )
    # FIXME not picking up https
    client.params['redirect_uri'] = '{}://{}{}'.format(request.scheme, request.host, request.path)

    if client.shared_key not in request.GET:  # 'code' not in request.GET
        return web.HTTPFound(client.get_authorize_url())

    access_token, __ = await client.get_access_token(request.GET)
    user, info = await client.user_info()
    # TODO store in session storage

    if user.email in args.admins:
        session['is_authed'] = True
        session['user'] = {
            'name': info['displayName'],
            'email': user.email,
            'avatar': user.picture,
        }
    else:
        return web.HTTPForbidden()

    return web.HTTPFound('/')
decorators.py 文件源码 项目:Windless 作者: chiaki64 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def require(func):
    @functools.wraps(func)
    async def wrapper(*args):
        if isinstance(args[-1], AbstractView):
            request = args[-1].request
        else:
            request = args[-1]
        has_perm = await permits(request, 'Administrator')
        if not has_perm:
            raise web.HTTPForbidden()

        if isinstance(args, AbstractView):
            return await func(*args)
        return await func(*args)
    return wrapper
test_web_functional.py 文件源码 项目:aiohttp-tokio 作者: fafhrd91 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_100_continue_custom_response(loop, test_client):

    @asyncio.coroutine
    def handler(request):
        data = yield from request.post()
        assert b'123', data['name']
        return web.Response()

    @asyncio.coroutine
    def expect_handler(request):
        if request.version == HttpVersion11:
            if auth_err:
                return web.HTTPForbidden()

            request.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n")

    form = FormData()
    form.add_field('name', b'123',
                   content_transfer_encoding='base64')

    app = web.Application()
    app.router.add_post('/', handler, expect_handler=expect_handler)
    client = yield from test_client(app)

    auth_err = False
    resp = yield from client.post('/', data=form, expect100=True)
    assert 200 == resp.status

    auth_err = True
    resp = yield from client.post('/', data=form, expect100=True)
    assert 403 == resp.status
evernote.py 文件源码 项目:evernote-telegram-bot 作者: djudman 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def oauth_callback(request):
    logger = request.app.logger
    bot = request.app.bot
    config_data = config['evernote']['basic_access']
    params = parse_qs(request.query_string)
    callback_key = params.get('key', [''])[0]
    session_key = params.get('session_key')[0]
    try:
        session = StartSession.get({'oauth_data.callback_key': callback_key})
    except ModelNotFound as e:
        logger.error(e, exc_info=1)
        return web.HTTPForbidden()

    if not params.get('oauth_verifier'):
        logger.info('User declined access. No access token =(')
        bot.send_message(session.data['chat_id'],
                         'We are sorry, but you declined authorization ??')
        return web.HTTPFound(bot.url)
    if session.key != session_key:
        text = 'Session is expired. \
Please, send /start command to create new session'
        bot.send_message(session.data['chat_id'], text)
        return web.HTTPFound(bot.url)

    user_data = session.data['user']
    name = '{0} {1}'.format(user_data['first_name'], user_data['last_name'])
    user = User(id=session.id,
                name=name,
                username=user_data['username'],
                telegram_chat_id=session.data['chat_id'],
                mode='multiple_notes',
                places={},
                settings={'evernote_access': 'basic'})
    try:
        future = asyncio.ensure_future(
            bot.evernote.get_access_token(config_data, session.oauth_data,
                                          params['oauth_verifier'][0])
        )
        future.add_done_callback(
            functools.partial(set_access_token, bot, user)
        )
    except TokenRequestDenied as e:
        logger.error(e, exc_info=1)
        text = 'We are sorry, but we have some problems with Evernote \
connection. Please try again later'
        bot.send_message(user.telegram_chat_id, text)
    except Exception as e:
        logger.fatal(e, exc_info=1)
        bot.send_message(user.telegram_chat_id, 'Oops. Unknown error')

    text = 'Evernote account is connected.\n\
From now you can just send message and note be created.'
    bot.send_message(user.telegram_chat_id, text)
    user.save()
    return web.HTTPFound(bot.url)
evernote.py 文件源码 项目:evernote-telegram-bot 作者: djudman 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def oauth_callback_full_access(request):
    logger = request.app.logger
    bot = request.app.bot
    params = parse_qs(request.query_string)
    callback_key = params.get('key', [''])[0]
    session_key = params.get('session_key')[0]
    try:
        session = StartSession.get({'oauth_data.callback_key': callback_key})
        user = User.get({'id': session.id})
    except ModelNotFound as e:
        logger.error(e, exc_info=1)
        return web.HTTPForbidden()

    if not params.get('oauth_verifier'):
        logger.info('User declined full access =(')
        bot.send_message(
            user.telegram_chat_id,
            'We are sorry, but you deny read/update access??',
            {'hide_keyboard': True}
        )
        return web.HTTPFound(bot.url)
    if session.key != session_key:
        text = 'Session is expired. Please, send /start command to create \
new session'
        bot.send_message(user.telegram_chat_id, text)
        return web.HTTPFound(bot.url)
    try:
        oauth_verifier = params['oauth_verifier'][0]
        config_data = config['evernote']['full_access']
        future = asyncio.ensure_future(
            bot.evernote.get_access_token(config_data, session.oauth_data,
                                          oauth_verifier)
        )
        future.add_done_callback(
            functools.partial(switch_to_one_note_mode, bot, user.id)
        )
    except TokenRequestDenied as e:
        logger.error(e, exc_info=1)
        bot.send_message(
            user.telegram_chat_id,
            'We are sorry, but we have some problems with Evernote connection.\
 Please try again later',
            {'hide_keyboard': True}
        )
    except Exception as e:
        logger.fatal(e, exc_info=1)
        bot.send_message(user.telegram_chat_id, 'Oops. Unknown error',
                         {'hide_keyboard': True})
    text = 'From now this bot in "One note" mode'
    bot.send_message(user.telegram_chat_id, text,
                     {'hide_keyboard': True})
    return web.HTTPFound(bot.url)
web.py 文件源码 项目:fireq 作者: superdesk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def auth_middleware(app, handler):
    """ Login via Github """
    def gh_client(**kw):
        return GithubClient(conf['github_id'], conf['github_secret'], **kw)

    async def callback(request):
        session = await get_session(request)
        log.debug('callback: session=%s GET=%s', session, request.GET)
        if session.get('github_state') != request.GET.get('state'):
            return web.HTTPBadRequest()
        code = request.GET.get('code')
        if not code:
            return web.HTTPBadRequest()

        gh = gh_client()
        token, _ = await gh.get_access_token(code)
        gh = gh_client(access_token=token)
        req = await gh.request('GET', 'user')
        user = await req.json()
        req.close()
        users = []
        for org in conf['github_orgs']:
            _, resp = await gh_api('orgs/%s/members?per_page=100' % org)
            users.extend(u['login'] for u in resp)
        log.debug('members %s: %s', len(users), users)
        if user.get('login') in users:
            session['login'] = user.get('login')
            session.pop('github_state', None)
            session.pop('github_url', None)
            location = session.pop('location')
            return web.HTTPFound(location)
        return web.HTTPForbidden()

    async def check_auth(request):
        session = await get_session(request)
        login = session.get('login')
        if login:
            request['login'] = login
            return await handler(request)
        elif 'github_state' not in session:
            gh = gh_client()
            state = str(uuid.uuid4())
            url = gh.get_authorize_url(scope='', state=state)
            session['github_state'] = state
            session['github_url'] = url
            session['location'] = request.path
            log.debug('check_auth: %s', session)
        return web.HTTPFound(conf['url_prefix'] + '/login')

    async def inner(request):
        if request.path == (conf['url_prefix'] + conf['github_callback']):
            return await callback(request)
        elif request.path == (conf['url_prefix'] + '/hook'):
            return await handler(request)
        elif request.path == (conf['url_prefix'] + '/login'):
            return await handler(request)
        else:
            return await check_auth(request)

    return inner


问题


面经


文章

微信
公众号

扫码关注公众号