def mock_dripping_response(self, chunks, **kwargs):
ip = iter(chunks)
loop = asyncio.get_event_loop()
rsock, wsock = socket.socketpair()
resp = FakeTextResponse('', **kwargs)
resp.content, readtr = await asyncio.open_connection(sock=rsock)
def send_next():
try:
to_send = next(ip)
except StopIteration:
wsock.close()
return
wsock.send(to_send)
loop.call_soon(send_next)
loop.call_soon(send_next)
return self._cm(resp, readtr)
评论列表
文章目录