def testUserFail(self):
f = MyServerFactory()
p = reactor.listenTCP(0, f, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
def startedConnecting(connector):
connector.stopConnecting()
factory = ClientStartStopFactory()
factory.startedConnecting = startedConnecting
reactor.connectTCP("127.0.0.1", n, factory)
d = loopUntil(lambda :factory.stopped)
def check(ignored):
self.assertEquals(factory.failed, 1)
factory.reason.trap(error.UserError)
return self.cleanPorts(*self.ports)
return d.addCallback(check)
评论列表
文章目录