def testPIDFile(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600, wantPID=1)
self.failUnless(lockfile.isLocked(filename + ".lock"))
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf, checkPID=1)
d = defer.gatherResults([f.deferred, tcf.deferred])
def _portStuff(ignored):
self._addPorts(l, c.transport, tcf.protocol.transport,
f.protocol.transport)
return self.cleanPorts(*self.ports)
def _check(ignored):
self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
d.addCallback(_portStuff)
d.addCallback(_check)
return d
评论列表
文章目录