iostream_test.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码
def test_streaming_until_close_future(self):
        server, client = self.make_iostream_pair()
        try:
            chunks = []

            @gen.coroutine
            def client_task():
                yield client.read_until_close(streaming_callback=chunks.append)

            @gen.coroutine
            def server_task():
                yield server.write(b"1234")
                yield gen.sleep(0.01)
                yield server.write(b"5678")
                server.close()

            @gen.coroutine
            def f():
                yield [client_task(), server_task()]
            self.io_loop.run_sync(f)
            self.assertEqual(chunks, [b"1234", b"5678"])
        finally:
            server.close()
            client.close()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号