def test_delayed_looping_call_register_wait_and_cancel(self):
self.assertFalse(self.tm.is_pending_task_active("test"))
lc = LoopingCall(self.count)
lc.clock = self.tm._reactor
self.tm.register_task("test", lc, delay=1, interval=1)
self.assertTrue(self.tm.is_pending_task_active("test"))
# After one second, the counter has increased by one and the task is still active.
self.tm._reactor.advance(1)
self.assertEquals(1, self.counter)
self.assertTrue(self.tm.is_pending_task_active("test"))
# After one more second, the counter should be 2
self.tm._reactor.advance(1)
self.assertEquals(2, self.counter)
# After canceling the task the counter should stop increasing
self.tm.cancel_pending_task("test")
self.assertFalse(self.tm.is_pending_task_active("test"))
self.tm._reactor.advance(10)
self.assertEquals(2, self.counter)
评论列表
文章目录