test_request_timeout.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:sanic 作者: channelcat 项目源码 文件源码
def _local_request(self, method, uri, cookies=None, *args,
                             **kwargs):
        if self._loop is None:
            self._loop = asyncio.get_event_loop()
        if uri.startswith(('http:', 'https:', 'ftp:', 'ftps://' '//')):
            url = uri
        else:
            url = 'http://{host}:{port}{uri}'.format(
                host=HOST, port=PORT, uri=uri)
        conn = DelayableTCPConnector(pre_request_delay=self._request_delay,
                                     verify_ssl=False, loop=self._loop)
        async with aiohttp.ClientSession(cookies=cookies, connector=conn,
                                         loop=self._loop) as session:
            # Insert a delay after creating the connection
            # But before sending the request.

            async with getattr(session, method.lower())(
                    url, *args, **kwargs) as response:
                try:
                    response.text = await response.text()
                except UnicodeDecodeError:
                    response.text = None

                try:
                    response.json = await response.json()
                except (JSONDecodeError,
                        UnicodeDecodeError,
                        aiohttp.ClientResponseError):
                    response.json = None

                response.body = await response.read()
                return response
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号