def test_stream_cancel(event_loop):
async def cancel(task):
await asyncio.sleep(0.001)
task.cancel()
async def test_stream_iterations(stream):
while True:
await test_stream_iteration(stream)
with aiohttp.ClientSession(loop=event_loop) as session:
client = peony.client.BasePeonyClient("", "", session=session)
context = peony.stream.StreamResponse(method='GET',
url="http://whatever.com",
client=client)
with context as stream:
with patch.object(stream, '_connect',
side_effect=stream_content):
coro = test_stream_iterations(stream)
task = event_loop.create_task(coro)
cancel_task = event_loop.create_task(cancel(task))
with aiohttp.Timeout(1):
await asyncio.wait([task, cancel_task])
评论列表
文章目录