def testConnectionRefused(self):
# assume no one listening on port 80 UDP
client = GoodClient()
clientStarted = client.startedDeferred = defer.Deferred()
port = reactor.listenUDP(0, client, interface="127.0.0.1")
server = Server()
serverStarted = server.startedDeferred = defer.Deferred()
port2 = reactor.listenUDP(0, server, interface="127.0.0.1")
d = defer.DeferredList(
[clientStarted, serverStarted],
fireOnOneErrback=True)
def cbStarted(ignored):
connectionRefused = client.startedDeferred = defer.Deferred()
client.transport.connect("127.0.0.1", 80)
for i in range(10):
client.transport.write(str(i))
server.transport.write(str(i), ("127.0.0.1", 80))
return self.assertFailure(
connectionRefused,
error.ConnectionRefusedError)
d.addCallback(cbStarted)
def cbFinished(ignored):
return defer.DeferredList([
defer.maybeDeferred(port.stopListening),
defer.maybeDeferred(port2.stopListening)],
fireOnOneErrback=True)
d.addCallback(cbFinished)
return d
评论列表
文章目录