def test_future_close_callback(self):
# Regression test for interaction between the Future read interfaces
# and IOStream._maybe_add_error_listener.
server, client = self.make_iostream_pair()
closed = [False]
def close_callback():
closed[0] = True
self.stop()
server.set_close_callback(close_callback)
try:
client.write(b'a')
future = server.read_bytes(1)
self.io_loop.add_future(future, self.stop)
self.assertEqual(self.wait().result(), b'a')
self.assertFalse(closed[0])
client.close()
self.wait()
self.assertTrue(closed[0])
finally:
server.close()
client.close()
评论列表
文章目录