def test_streaming_until_close_future(self):
server, client = self.make_iostream_pair()
try:
chunks = []
@gen.coroutine
def client_task():
yield client.read_until_close(streaming_callback=chunks.append)
@gen.coroutine
def server_task():
yield server.write(b"1234")
yield gen.sleep(0.01)
yield server.write(b"5678")
server.close()
@gen.coroutine
def f():
yield [client_task(), server_task()]
self.io_loop.run_sync(f)
self.assertEqual(chunks, [b"1234", b"5678"])
finally:
server.close()
client.close()
评论列表
文章目录