def test_timeout(loop):
canceled_raised = False
@asyncio.coroutine
def long_running_task():
try:
yield from asyncio.sleep(10, loop=loop)
except asyncio.CancelledError:
nonlocal canceled_raised
canceled_raised = True
raise
with pytest.raises(asyncio.TimeoutError):
with timeout(0.01, loop=loop) as t:
yield from long_running_task()
assert t._loop is loop
assert canceled_raised, 'CancelledError was not raised'
评论列表
文章目录