def testProcess(self):
if os.path.exists('/bin/gzip'): cmd = '/bin/gzip'
elif os.path.exists('/usr/bin/gzip'): cmd = '/usr/bin/gzip'
else: raise RuntimeError("gzip not found in /bin or /usr/bin")
s = "there's no place like home!\n" * 3
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp",
usePTY=self.usePTY)
p.transport.write(s)
p.transport.closeStdin()
def processEnded(ign):
f = p.outF
f.seek(0, 0)
gf = gzip.GzipFile(fileobj=f)
self.assertEquals(gf.read(), s)
return d.addCallback(processEnded)
评论列表
文章目录