test_jobs.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:ccs-twistedextensions 作者: apple 项目源码 文件源码
def test_notBeforeWhenCheckingForWork(self):
        """
        L{ControllerQueue._workCheck} should execute any
        outstanding work items, but only those that are expired.
        """
        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)

        # Let's create a couple of work items directly, not via the enqueue
        # method, so that they exist but nobody will try to immediately execute
        # them.

        @transactionally(dbpool.pool.connection)
        @inlineCallbacks
        def setup(txn):
            # First, one that's right now.
            yield DummyWorkItem.makeJob(txn, a=1, b=2, notBefore=fakeNow)

            # Next, create one that's actually far enough into the past to run.
            yield DummyWorkItem.makeJob(
                txn, a=3, b=4, notBefore=(
                    # Schedule it in the past so that it should have already
                    # run.
                    fakeNow - datetime.timedelta(seconds=20)
                )
            )

            # Finally, one that's actually scheduled for the future.
            yield DummyWorkItem.makeJob(
                txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
            )
        yield setup

        # Wait for job
        while len(DummyWorkItem.results) != 2:
            clock.advance(1)

        # Work item complete
        self.assertTrue(DummyWorkItem.results == {1: 3, 2: 7})
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号