def testManyProcesses(self):
def _check(results, protocols):
for p in protocols:
self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
# test status code
f = p.reason
f.trap(error.ProcessTerminated)
self.assertEquals(f.value.exitCode, 23)
exe = sys.executable
scriptPath = util.sibpath(__file__, "process_tester.py")
args = [exe, "-u", scriptPath]
protocols = []
deferreds = []
for i in xrange(50):
p = TestManyProcessProtocol()
protocols.append(p)
reactor.spawnProcess(p, exe, args, env=None)
deferreds.append(p.deferred)
deferredList = defer.DeferredList(deferreds, consumeErrors=True)
deferredList.addCallback(_check, protocols)
return deferredList
评论列表
文章目录