test_response_dripping.py 文件源码

python
阅读 54 收藏 0 点赞 0 评论 0

项目:napper 作者: epsy 项目源码 文件源码
def mock_dripping_response(self, chunks, **kwargs):
        ip = iter(chunks)

        loop = asyncio.get_event_loop()
        rsock, wsock = socket.socketpair()
        resp = FakeTextResponse('', **kwargs)
        resp.content, readtr = await asyncio.open_connection(sock=rsock)

        def send_next():
            try:
                to_send = next(ip)
            except StopIteration:
                wsock.close()
                return
            wsock.send(to_send)
            loop.call_soon(send_next)

        loop.call_soon(send_next)

        return self._cm(resp, readtr)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号