def test_background_queue():
publish_event = MagicMock()
publish_event.return_value = 'aaa'
task_queue = TaskQueue(publish_event)
@task_queue.task()
def funcy():
global funcy_called
funcy_called += 1
return "blah"
assert funcy() == "blah"
assert funcy.delay() is True
event = jsonpickle.dumps((funcy.path, (), {}))
publish_event.assert_called_once_with(event)
task_queue.process_event(event)
assert funcy_called == 2
评论列表
文章目录