test_cooperator.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码
def test_removingLastTaskStopsScheduledCall(self):
        """
        If the last task in a Cooperator is removed, the scheduled call for
        the next tick is cancelled, since it is no longer necessary.

        This behavior is useful for tests that want to assert they have left
        no reactor state behind when they're done.
        """
        calls = [None]
        def sched(f):
            calls[0] = FakeDelayedCall(f)
            return calls[0]
        coop = task.Cooperator(scheduler=sched)

        # Add two task; this should schedule the tick:
        task1 = coop.cooperate(iter([1, 2]))
        task2 = coop.cooperate(iter([1, 2]))
        self.assertEqual(calls[0].func, coop._tick)

        # Remove first task; scheduled call should still be going:
        task1.stop()
        self.assertFalse(calls[0].cancelled)
        self.assertEqual(coop._delayedCall, calls[0])

        # Remove second task; scheduled call should be cancelled:
        task2.stop()
        self.assertTrue(calls[0].cancelled)
        self.assertIsNone(coop._delayedCall)

        # Add another task; scheduled call will be recreated:
        coop.cooperate(iter([1, 2]))
        self.assertFalse(calls[0].cancelled)
        self.assertEqual(coop._delayedCall, calls[0])
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号