test_basic.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码
def test_transferMultipleChunks(self):
        """
        L{basic.FileSender} reads at most C{CHUNK_SIZE} every time it resumes
        producing.
        """
        source = BytesIO(b"Test content")
        consumer = proto_helpers.StringTransport()
        sender = basic.FileSender()
        sender.CHUNK_SIZE = 4
        d = sender.beginFileTransfer(source, consumer)
        # Ideally we would assertNoResult(d) here, but <http://tm.tl/6291>
        sender.resumeProducing()
        self.assertEqual(b"Test", consumer.value())
        sender.resumeProducing()
        self.assertEqual(b"Test con", consumer.value())
        sender.resumeProducing()
        self.assertEqual(b"Test content", consumer.value())
        # resumeProducing only finishes after trying to read at eof
        sender.resumeProducing()

        self.assertEqual(b"t", self.successResultOf(d))
        self.assertEqual(b"Test content", consumer.value())
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号