def test_echo(self):
"""
A spawning a subprocess which echoes its stdin to its stdout via
L{IReactorProcess.spawnProcess} will result in that echoed output being
delivered to outReceived.
"""
finished = defer.Deferred()
p = EchoProtocol(finished)
scriptPath = b"twisted.test.process_echoer"
reactor.spawnProcess(p, pyExe, [pyExe, b'-u', b"-m", scriptPath],
env=properEnv)
def asserts(ignored):
self.assertFalse(p.failure, p.failure)
self.assertTrue(hasattr(p, 'buffer'))
self.assertEqual(len(p.buffer), len(p.s * p.n))
def takedownProcess(err):
p.transport.closeStdin()
return err
return finished.addCallback(asserts).addErrback(takedownProcess)
评论列表
文章目录