def testOutputStreams(self):
output_spec = {
'mode': 'http',
'method': 'PUT',
'url': 'http://localhost:%d' % _socket_port
}
fd = os.open(_pipepath, os.O_RDONLY | os.O_NONBLOCK)
adapters = {
fd: make_stream_push_adapter(output_spec)
}
cmd = [sys.executable, _oscript, _pipepath]
try:
with captureOutput() as stdpipes:
run_process(cmd, adapters)
except Exception:
print('Stdout/stderr from exception: ')
print(stdpipes)
raise
self.assertEqual(stdpipes, ['start\ndone\n', ''])
self.assertEqual(len(_req_chunks), 1)
self.assertEqual(_req_chunks[0], (9, 'a message'))
评论列表
文章目录