def wait(self, timeout=None):
"""Block until the internal flag is true.
Returns a Future, which raises `tornado.gen.TimeoutError` after a
timeout.
"""
if timeout is None:
return self._future
else:
return gen.with_timeout(timeout, self._future)
python类with_timeout()的实例源码
def test_timeout(self):
with self.assertRaises(gen.TimeoutError):
yield gen.with_timeout(datetime.timedelta(seconds=0.1),
Future())
def test_completes_before_timeout(self):
future = Future()
self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
lambda: future.set_result('asdf'))
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_fails_before_timeout(self):
future = Future()
self.io_loop.add_timeout(
datetime.timedelta(seconds=0.1),
lambda: future.set_exception(ZeroDivisionError()))
with self.assertRaises(ZeroDivisionError):
yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
def test_already_resolved(self):
future = Future()
future.set_result('asdf')
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_completed_concurrent_future(self):
with futures.ThreadPoolExecutor(1) as executor:
yield gen.with_timeout(datetime.timedelta(seconds=3600),
executor.submit(lambda: None))
def test_no_ref(self):
# In this usage, there is no direct hard reference to the
# WaitIterator itself, only the Future it returns. Since
# WaitIterator uses weak references internally to improve GC
# performance, this used to cause problems.
yield gen.with_timeout(datetime.timedelta(seconds=0.1),
gen.WaitIterator(gen.sleep(0)).next())
def wait(self, timeout=None):
"""Block until the internal flag is true.
Returns a Future, which raises `tornado.gen.TimeoutError` after a
timeout.
"""
if timeout is None:
return self._future
else:
return gen.with_timeout(timeout, self._future)
def test_timeout(self):
with self.assertRaises(gen.TimeoutError):
yield gen.with_timeout(datetime.timedelta(seconds=0.1),
Future())
def test_completes_before_timeout(self):
future = Future()
self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
lambda: future.set_result('asdf'))
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_fails_before_timeout(self):
future = Future()
self.io_loop.add_timeout(
datetime.timedelta(seconds=0.1),
lambda: future.set_exception(ZeroDivisionError()))
with self.assertRaises(ZeroDivisionError):
yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
def test_already_resolved(self):
future = Future()
future.set_result('asdf')
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_completed_concurrent_future(self):
with futures.ThreadPoolExecutor(1) as executor:
yield gen.with_timeout(datetime.timedelta(seconds=3600),
executor.submit(lambda: None))
def test_no_ref(self):
# In this usage, there is no direct hard reference to the
# WaitIterator itself, only the Future it returns. Since
# WaitIterator uses weak references internally to improve GC
# performance, this used to cause problems.
yield gen.with_timeout(datetime.timedelta(seconds=0.1),
gen.WaitIterator(gen.sleep(0)).next())
def wait(self, timeout=None):
"""Block until the internal flag is true.
Returns a Future, which raises `tornado.gen.TimeoutError` after a
timeout.
"""
if timeout is None:
return self._future
else:
return gen.with_timeout(timeout, self._future)
def test_timeout(self):
with self.assertRaises(gen.TimeoutError):
yield gen.with_timeout(datetime.timedelta(seconds=0.1),
Future())
def test_completes_before_timeout(self):
future = Future()
self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
lambda: future.set_result('asdf'))
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_already_resolved(self):
future = Future()
future.set_result('asdf')
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_timeout_concurrent_future(self):
with futures.ThreadPoolExecutor(1) as executor:
with self.assertRaises(gen.TimeoutError):
yield gen.with_timeout(self.io_loop.time(),
executor.submit(time.sleep, 0.1))
def test_completed_concurrent_future(self):
with futures.ThreadPoolExecutor(1) as executor:
yield gen.with_timeout(datetime.timedelta(seconds=3600),
executor.submit(lambda: None))