def test_timeout_concurrent_future(self):
with futures.ThreadPoolExecutor(1) as executor:
with self.assertRaises(gen.TimeoutError):
yield gen.with_timeout(self.io_loop.time(),
executor.submit(time.sleep, 0.1))
评论列表
文章目录