def test_gc(self):
"""Runners shouldn't GC if future is alive"""
# Create the weakref
weakref_scope = [None]
def callback():
gc.collect(2)
weakref_scope[0]().set_result(123)
@gen.coroutine
def tester():
fut = Future()
weakref_scope[0] = weakref.ref(fut)
self.io_loop.add_callback(callback)
yield fut
yield gen.with_timeout(
datetime.timedelta(seconds=0.2),
tester()
)
python类with_timeout()的实例源码
def test_gc(self):
"""Runners shouldn't GC if future is alive"""
# Create the weakref
weakref_scope = [None]
def callback():
gc.collect(2)
weakref_scope[0]().set_result(123)
@gen.coroutine
def tester():
fut = Future()
weakref_scope[0] = weakref.ref(fut)
self.io_loop.add_callback(callback)
yield fut
yield gen.with_timeout(
datetime.timedelta(seconds=0.2),
tester()
)
def open(self, timeout=DEFAULT_CONNECT_TIMEOUT):
logger.debug('socket connecting')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
if self.ssl_options is None:
self.stream = iostream.IOStream(sock)
else:
self.stream = iostream.SSLIOStream(
sock, ssl_options=self.ssl_options)
try:
yield self.with_timeout(timeout, self.stream.connect(
(self.host, self.port)))
except (socket.error, OSError, IOError):
message = 'could not connect to {}:{}'.format(self.host, self.port)
raise TTransportException(
type=TTransportException.NOT_OPEN,
message=message)
self._set_close_callback()
raise gen.Return(self)
def wait_on_sibling(self, sibling, time_limit=None):
log.debug("Waiting on sibling %s", sibling)
path = self.sibling_path(sibling)
unblocked = self.client.wait_for_event(WatchEvent.DELETED, path)
if time_limit:
unblocked = gen.with_timeout(time_limit, unblocked)
exists = yield self.client.exists(path=path, watch=True)
if not exists:
unblocked.set_result(None)
try:
yield unblocked
except gen.TimeoutError:
raise exc.TimeoutError
def wait(self, timeout=None):
time_limit = None
if timeout is not None:
time_limit = time.time() + timeout
barrier_lifted = self.client.wait_for_event(
WatchEvent.DELETED, self.path
)
if time_limit:
barrier_lifted = gen.with_timeout(time_limit, barrier_lifted)
exists = yield self.client.exists(path=self.path, watch=True)
if not exists:
return
try:
yield barrier_lifted
except gen.TimeoutError:
raise exc.TimeoutError
def close(self, timeout):
if self.closing:
return
self.closing = True
pending_with_timeouts = []
for pending in self.drain_all_pending():
pending_with_timeouts.append(gen.with_timeout(timeout, pending))
try:
yield list(pending_with_timeouts)
except gen.TimeoutError:
yield self.abort(exception=exc.TimeoutError)
finally:
self.stream.close()
def test_gc(self):
"""Runners shouldn't GC if future is alive"""
# Create the weakref
weakref_scope = [None]
def callback():
gc.collect(2)
weakref_scope[0]().set_result(123)
@gen.coroutine
def tester():
fut = Future()
weakref_scope[0] = weakref.ref(fut)
self.io_loop.add_callback(callback)
yield fut
yield gen.with_timeout(
datetime.timedelta(seconds=0.2),
tester()
)
def test_gc(self):
"""Runners shouldn't GC if future is alive"""
# Create the weakref
weakref_scope = [None]
def callback():
gc.collect(2)
weakref_scope[0]().set_result(123)
@gen.coroutine
def tester():
fut = Future()
weakref_scope[0] = weakref.ref(fut)
self.io_loop.add_callback(callback)
yield fut
yield gen.with_timeout(
datetime.timedelta(seconds=0.2),
tester()
)
def wait(self, timeout=None):
"""Block until the internal flag is true.
Returns a Future, which raises `tornado.gen.TimeoutError` after a
timeout.
"""
if timeout is None:
return self._future
else:
return gen.with_timeout(timeout, self._future)
def test_timeout(self):
with self.assertRaises(gen.TimeoutError):
yield gen.with_timeout(datetime.timedelta(seconds=0.1),
Future())
def test_completes_before_timeout(self):
future = Future()
self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
lambda: future.set_result('asdf'))
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_fails_before_timeout(self):
future = Future()
self.io_loop.add_timeout(
datetime.timedelta(seconds=0.1),
lambda: future.set_exception(ZeroDivisionError()))
with self.assertRaises(ZeroDivisionError):
yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
def test_already_resolved(self):
future = Future()
future.set_result('asdf')
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_completed_concurrent_future(self):
with futures.ThreadPoolExecutor(1) as executor:
yield gen.with_timeout(datetime.timedelta(seconds=3600),
executor.submit(lambda: None))
def test_no_ref(self):
# In this usage, there is no direct hard reference to the
# WaitIterator itself, only the Future it returns. Since
# WaitIterator uses weak references internally to improve GC
# performance, this used to cause problems.
yield gen.with_timeout(datetime.timedelta(seconds=0.1),
gen.WaitIterator(gen.sleep(0)).next())
def wait(self, timeout=None):
"""Block until the internal flag is true.
Returns a Future, which raises `tornado.gen.TimeoutError` after a
timeout.
"""
if timeout is None:
return self._future
else:
return gen.with_timeout(timeout, self._future)
def test_timeout(self):
with self.assertRaises(gen.TimeoutError):
yield gen.with_timeout(datetime.timedelta(seconds=0.1),
Future())
def test_completes_before_timeout(self):
future = Future()
self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
lambda: future.set_result('asdf'))
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_already_resolved(self):
future = Future()
future.set_result('asdf')
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_timeout_concurrent_future(self):
with futures.ThreadPoolExecutor(1) as executor:
with self.assertRaises(gen.TimeoutError):
yield gen.with_timeout(self.io_loop.time(),
executor.submit(time.sleep, 0.1))
def test_completed_concurrent_future(self):
with futures.ThreadPoolExecutor(1) as executor:
yield gen.with_timeout(datetime.timedelta(seconds=3600),
executor.submit(lambda: None))
def test_no_ref(self):
# In this usage, there is no direct hard reference to the
# WaitIterator itself, only the Future it returns. Since
# WaitIterator uses weak references internally to improve GC
# performance, this used to cause problems.
yield gen.with_timeout(datetime.timedelta(seconds=0.1),
gen.WaitIterator(gen.sleep(0)).next())
def wait(self, timeout=None):
"""Block until the internal flag is true.
Returns a Future, which raises `tornado.gen.TimeoutError` after a
timeout.
"""
if timeout is None:
return self._future
else:
return gen.with_timeout(timeout, self._future)
def test_completes_before_timeout(self):
future = Future()
self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
lambda: future.set_result('asdf'))
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_fails_before_timeout(self):
future = Future()
self.io_loop.add_timeout(
datetime.timedelta(seconds=0.1),
lambda: future.set_exception(ZeroDivisionError()))
with self.assertRaises(ZeroDivisionError):
yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
def test_already_resolved(self):
future = Future()
future.set_result('asdf')
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future, io_loop=self.io_loop)
self.assertEqual(result, 'asdf')
def test_timeout_concurrent_future(self):
with futures.ThreadPoolExecutor(1) as executor:
with self.assertRaises(gen.TimeoutError):
yield gen.with_timeout(self.io_loop.time(),
executor.submit(time.sleep, 0.1))
def test_completed_concurrent_future(self):
with futures.ThreadPoolExecutor(1) as executor:
yield gen.with_timeout(datetime.timedelta(seconds=3600),
executor.submit(lambda: None))
def test_readonly(self):
client_conn = Connection(self.client_stream, client_side=True, readonly=True)
client_conn.initiate_connection()
client_conn.send_request(
client_conn.get_next_available_stream_id(),
HttpRequest(headers=[
(":method", "GET"),
(":path", "/"),
("aaa", "bbb")]))
with self.assertRaises(gen.TimeoutError):
yield gen.with_timeout(
timedelta(milliseconds=100),
self.server_stream.read_bytes(1))
def create_dest_stream(self, dest_addr_info):
dest_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
dest_stream = MicroProxyIOStream(dest_socket)
yield gen.with_timeout(
timedelta(seconds=5), dest_stream.connect(dest_addr_info))
raise gen.Return(dest_stream)