def test_stdio(self):
"""
L{twisted.internet.stdio} test.
"""
scriptPath = b"twisted.test.process_twisted"
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, pyExe, [pyExe, b'-u', b"-m", scriptPath],
env=properEnv,
path=None, usePTY=self.usePTY)
p.transport.write(b"hello, world")
p.transport.write(b"abc")
p.transport.write(b"123")
p.transport.closeStdin()
def processEnded(ign):
self.assertEqual(p.outF.getvalue(), b"hello, worldabc123",
"Output follows:\n"
"%s\n"
"Error message from process_twisted follows:\n"
"%s\n" % (p.outF.getvalue(), p.errF.getvalue()))
return d.addCallback(processEnded)
评论列表
文章目录