test_task_executor.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:captaincloud 作者: bpsagar 项目源码 文件源码
def test_stream(self):
        with open('test.tmp', 'wb') as fd:
            fd.write(six.b('Hello World ') * 100)

        task = TaskRegistry.create('test_executor_stream_copy')
        task.Input.stream = open('test.tmp', 'rb')
        task.Output.stream = open('test.out', 'wb')

        executor = TaskExecutor()
        executor.set_task(task)
        executor.execute()

        with open('test.out', 'rb') as fd:
            self.assertEqual(fd.read(), six.b('Hello World ') * 100)

        os.unlink('test.tmp')
        os.unlink('test.out')
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号