test_taskqueue.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:xavier 作者: bepress 项目源码 文件源码
def test_background_queue():
    publish_event = MagicMock()
    publish_event.return_value = 'aaa'

    task_queue = TaskQueue(publish_event)

    @task_queue.task()
    def funcy():
        global funcy_called
        funcy_called += 1
        return "blah"

    assert funcy() == "blah"

    assert funcy.delay() is True

    event = jsonpickle.dumps((funcy.path, (), {}))

    publish_event.assert_called_once_with(event)

    task_queue.process_event(event)
    assert funcy_called == 2
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号