queues_test.py 文件源码

python
阅读 40 收藏 0 点赞 0 评论 0

项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码
def test_put_clears_timed_out_getters(self):
        q = queues.Queue()
        getters = [q.get(timedelta(seconds=0.01)) for _ in range(10)]
        get = q.get()
        q.get()
        self.assertEqual(12, len(q._getters))
        yield gen.sleep(0.02)
        self.assertEqual(12, len(q._getters))
        self.assertFalse(get.done())  # Final waiters still active.
        q.put(0)  # put() clears the waiters.
        self.assertEqual(1, len(q._getters))
        self.assertEqual(0, (yield get))
        for getter in getters:
            self.assertRaises(TimeoutError, getter.result)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号