def test_pause_resume_writing(self):
tr = self.pause_writing_transport(high=4)
# write a large chunk, must pause writing
fut = asyncio.Future(loop=self.loop)
self.loop._proactor.send.return_value = fut
tr.write(b'large data')
self.loop._run_once()
self.assertTrue(self.protocol.pause_writing.called)
# flush the buffer
fut.set_result(None)
self.loop._run_once()
self.assertEqual(tr.get_write_buffer_size(), 0)
self.assertTrue(self.protocol.resume_writing.called)
test_proactor_events.py 文件源码
python
阅读 28
收藏 0
点赞 0
评论 0
评论列表
文章目录