def testHostAddress(self):
f1 = MyServerFactory()
p1 = reactor.listenTCP(0, f1, interface='127.0.0.1')
n = p1.getHost().port
self.ports.append(p1)
f2 = MyOtherClientFactory()
p2 = reactor.connectTCP('127.0.0.1', n, f2)
d = loopUntil(lambda :p2.state == "connected")
def check(ignored):
self.assertEquals(p1.getHost(), f2.address)
self.assertEquals(p1.getHost(), f2.protocol.transport.getPeer())
return p1.stopListening()
def cleanup(ignored):
self.ports.append(p2.transport)
return self.cleanPorts(*self.ports)
return d.addCallback(check).addCallback(cleanup)
评论列表
文章目录