def test_sns_background_queue():
with patch('xavier.aws.sns.send_sns_message') as mock_send_sns_message:
mock_send_sns_message.return_value = {"MessageId": "1234"}
publish_event = publish_sns_message("sns:topic")
task_queue = TaskQueue(publish_event)
@task_queue.task(schedules=['bbb'])
def funcy():
return "Blah"
funcy.delay()
event = jsonpickle.dumps((funcy.path, (), {}))
mock_send_sns_message.assert_called_once_with(
Message=event,
TopicArn="sns:topic",
)
mock_background_func = MagicMock()
mock_background_func.__name__ = "background_func"
mock_background_func.__module__ = "testing"
mock_background_func.return_value = "awesome"
composed_mock = task_queue.task()(mock_background_func)
event = jsonpickle.dumps((composed_mock.path, (), {}))
sns_consumer = handle_sns_message(task_queue.process_event)
sns_consumer({
"Records": [{
'Sns': {
'Message': event
}
}]
}, {})
mock_background_func.assert_called_once_with()
评论列表
文章目录