iostream_test.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码
def test_future_close_callback(self):
        # Regression test for interaction between the Future read interfaces
        # and IOStream._maybe_add_error_listener.
        server, client = self.make_iostream_pair()
        closed = [False]

        def close_callback():
            closed[0] = True
            self.stop()
        server.set_close_callback(close_callback)
        try:
            client.write(b'a')
            future = server.read_bytes(1)
            self.io_loop.add_future(future, self.stop)
            self.assertEqual(self.wait().result(), b'a')
            self.assertFalse(closed[0])
            client.close()
            self.wait()
            self.assertTrue(closed[0])
        finally:
            server.close()
            client.close()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号