stream_test.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:girder_worker 作者: girder 项目源码 文件源码
def testOutputStreams(self):
        output_spec = {
            'mode': 'http',
            'method': 'PUT',
            'url': 'http://localhost:%d' % _socket_port
        }

        fd = os.open(_pipepath, os.O_RDONLY | os.O_NONBLOCK)
        adapters = {
            fd: make_stream_push_adapter(output_spec)
        }
        cmd = [sys.executable, _oscript, _pipepath]

        try:
            with captureOutput() as stdpipes:
                run_process(cmd, adapters)
        except Exception:
            print('Stdout/stderr from exception: ')
            print(stdpipes)
            raise
        self.assertEqual(stdpipes, ['start\ndone\n', ''])
        self.assertEqual(len(_req_chunks), 1)
        self.assertEqual(_req_chunks[0], (9, 'a message'))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号