def testFailurePropagation(self):
# Tests if the failure of the errback of the deferred returned by the
# callable is propagated to the lc errback.
#
# To make sure this test does not hang trial when LoopingCall does not
# wait for the callable's deferred, it also checks there are no
# calls in the clock's callLater queue.
timings = [0.3]
clock = task.Clock()
def foo():
d = defer.Deferred()
clock.callLater(0.3, d.errback, TestException())
return d
lc = TestableLoopingCall(clock, foo)
d = lc.start(1)
self.assertFailure(d, TestException)
clock.pump(timings)
self.failIf(clock.calls)
return d
评论列表
文章目录