def _cancelConnectTest(self, connect):
"""
Helper for implementing a test to verify that cancellation of the
L{Deferred} returned by one of L{ClientCreator}'s I{connect} methods is
implemented to cancel the underlying connector.
@param connect: A function which will be invoked with a L{ClientCreator}
instance as an argument and which should call one its I{connect}
methods and return the result.
@return: A L{Deferred} which fires when the test is complete or fails if
there is a problem.
"""
reactor = MemoryReactorClock()
cc = ClientCreator(reactor, Protocol)
d = connect(cc)
connector = reactor.connectors.pop()
self.assertFalse(connector._disconnected)
d.cancel()
self.assertTrue(connector._disconnected)
return self.assertFailure(d, CancelledError)
评论列表
文章目录