def testStdio(self):
"""twisted.internet.stdio test."""
exe = sys.executable
scriptPath = util.sibpath(__file__, "process_twisted.py")
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
env = {"PYTHONPATH": os.pathsep.join(sys.path)}
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=env,
path=None, usePTY=self.usePTY)
p.transport.write("hello, world")
p.transport.write("abc")
p.transport.write("123")
p.transport.closeStdin()
def processEnded(ign):
self.assertEquals(p.outF.getvalue(), "hello, worldabc123",
"Output follows:\n"
"%s\n"
"Error message from process_twisted follows:\n"
"%s\n" % (p.outF.getvalue(), p.errF.getvalue()))
return d.addCallback(processEnded)
评论列表
文章目录