test_sched.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:web_ctp 作者: molebot 项目源码 文件源码
def test_enter_concurrent(self):
        q = queue.Queue()
        fun = q.put
        timer = Timer()
        scheduler = sched.scheduler(timer.time, timer.sleep)
        scheduler.enter(1, 1, fun, (1,))
        scheduler.enter(3, 1, fun, (3,))
        t = threading.Thread(target=scheduler.run)
        t.start()
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 1)
        self.assertTrue(q.empty())
        for x in [4, 5, 2]:
            z = scheduler.enter(x - 1, 1, fun, (x,))
        timer.advance(2)
        self.assertEqual(q.get(timeout=TIMEOUT), 2)
        self.assertEqual(q.get(timeout=TIMEOUT), 3)
        self.assertTrue(q.empty())
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 4)
        self.assertTrue(q.empty())
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 5)
        self.assertTrue(q.empty())
        timer.advance(1000)
        t.join(timeout=TIMEOUT)
        self.assertFalse(t.is_alive())
        self.assertTrue(q.empty())
        self.assertEqual(timer.time(), 5)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号