test_keep_alive_timeout.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:sanic 作者: channelcat 项目源码 文件源码
def _local_request(self, method, uri, cookies=None, *args,
                             **kwargs):
        request_keepalive = kwargs.pop('request_keepalive',
                                       Config.KEEP_ALIVE_TIMEOUT)
        if uri.startswith(('http:', 'https:', 'ftp:', 'ftps://' '//')):
            url = uri
        else:
            url = 'http://{host}:{port}{uri}'.format(
                host=HOST, port=PORT, uri=uri)
        do_kill_session = kwargs.pop('end_session', False)
        if self._session:
            session = self._session
        else:
            if self._tcp_connector:
                conn = self._tcp_connector
            else:
                conn = ReuseableTCPConnector(verify_ssl=False,
                                             loop=self._loop,
                                             keepalive_timeout=
                                             request_keepalive)
                self._tcp_connector = conn
            session = aiohttp.ClientSession(cookies=cookies,
                                            connector=conn,
                                            loop=self._loop)
            self._session = session

        async with getattr(session, method.lower())(
                url, *args, **kwargs) as response:
            try:
                response.text = await response.text()
            except UnicodeDecodeError:
                response.text = None

            try:
                response.json = await response.json()
            except (JSONDecodeError,
                    UnicodeDecodeError,
                    aiohttp.ClientResponseError):
                response.json = None

            response.body = await response.read()
        if do_kill_session:
            session.close()
            self._session = None
        return response
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号