test_cooperator.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:hostapd-mana 作者: adde88 项目源码 文件源码
def testCallbackReCoiterate(self):
        """
        If a callback to a deferred returned by coiterate calls coiterate on
        the same Cooperator, we should make sure to only do the minimal amount
        of scheduling work.  (This test was added to demonstrate a specific bug
        that was found while writing the scheduler.)
        """
        calls = []

        class FakeCall:
            def __init__(self, func):
                self.func = func

            def __repr__(self):
                return '<FakeCall %r>' % (self.func,)

        def sched(f):
            self.failIf(calls, repr(calls))
            calls.append(FakeCall(f))
            return calls[-1]

        c = task.Cooperator(scheduler=sched, terminationPredicateFactory=lambda: lambda: True)
        d = c.coiterate(iter(()))

        done = []
        def anotherTask(ign):
            c.coiterate(iter(())).addBoth(done.append)

        d.addCallback(anotherTask)

        work = 0
        while not done:
            work += 1
            while calls:
                calls.pop(0).func()
                work += 1
            if work > 50:
                self.fail("Cooperator took too long")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号