asyncio.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:apm-agent-python 作者: elastic 项目源码 文件源码
def send(self, data, headers, timeout=None):
        """Use synchronous interface, because this is a coroutine."""

        try:
            with aiohttp.Timeout(timeout):
                async with self.client.post(self._url,
                                            data=data,
                                            headers=headers) as response:
                    assert response.status == 202
        except asyncio.TimeoutError as e:
            print_trace = True
            message = ("Connection to APM Server timed out "
                       "(url: %s, timeout: %s seconds)" % (self._url, timeout))
            raise TransportException(message, data,
                                     print_trace=print_trace) from e
        except AssertionError as e:
            print_trace = True
            body = await response.read()
            if response.status == 429:
                message = 'Temporarily rate limited: '
                print_trace = False
            else:
                message = 'Unable to reach APM Server: '
            message += '%s (url: %s, body: %s)' % (e, self._url, body)
            raise TransportException(message, data,
                                     print_trace=print_trace) from e
        except Exception as e:
            print_trace = True
            message = 'Unable to reach APM Server: %s (url: %s)' % (
                e, self._url)
            raise TransportException(message, data,
                                     print_trace=print_trace) from e
        else:
            return response.headers.get('Location')
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号