priority_refresh_queue_test.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:data_pipeline 作者: Yelp 项目源码 文件源码
def test_refresh_queue_no_job(
        self,
        priority_refresh_queue,
        fake_source_name,
        refresh_result
    ):
        assert priority_refresh_queue.peek() == {}
        with pytest.raises(EmptyQueueError) as e:
            priority_refresh_queue.pop(fake_source_name)
        assert fake_source_name in e.value.message

        priority_refresh_queue.add_refreshes_to_queue([refresh_result])
        assert priority_refresh_queue.pop(fake_source_name) == refresh_result
        with pytest.raises(EmptyQueueError) as e:
            priority_refresh_queue.pop(fake_source_name)
        assert fake_source_name in e.value.message
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号