def test_schedule_queue():
publish_event = MagicMock()
publish_event.return_value = 'aaa'
task_queue = TaskQueue(publish_event)
@task_queue.task(schedules=['bbb'])
def funcy():
return "blah"
task_queue.process_schedule('bbb')
event = jsonpickle.dumps((funcy.path, (), {}))
publish_event.assert_called_once_with(event)
评论列表
文章目录