def testExchange(self):
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write("hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def cleanup(ignored):
d1 = defer.maybeDeferred(s.stopListening)
d1.addCallback(lambda x : os.unlink(clientaddr))
d2 = defer.maybeDeferred(c.stopListening)
d2.addCallback(lambda x : os.unlink(serveraddr))
return defer.gatherResults([d1, d2])
def _cbTestExchange(ignored):
self.failUnlessEqual("hi", sp.gotwhat)
self.failUnlessEqual(clientaddr, sp.gotfrom)
self.failUnlessEqual("hi back", cp.gotback)
d.addCallback(write)
d.addCallback(cleanup)
d.addCallback(_cbTestExchange)
return d
评论列表
文章目录