def testReconnect(self):
f = ClosingFactory()
p = reactor.listenTCP(0, f, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
f.port = p
factory = MyClientFactory()
d = loopUntil(lambda :p.connected)
def step1(ignored):
def clientConnectionLost(c, reason):
c.connect()
factory.clientConnectionLost = clientConnectionLost
reactor.connectTCP("127.0.0.1", n, factory)
return loopUntil(lambda :factory.failed)
def step2(ignored):
p = factory.protocol
self.assertEquals((p.made, p.closed), (1, 1))
factory.reason.trap(error.ConnectionRefusedError)
self.assertEquals(factory.stopped, 1)
return self.cleanPorts(*self.ports)
return d.addCallback(step1).addCallback(step2)
评论列表
文章目录