test_taskqueue.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:xavier 作者: bepress 项目源码 文件源码
def test_sns_background_queue():

    with patch('xavier.aws.sns.send_sns_message') as mock_send_sns_message:
        mock_send_sns_message.return_value = {"MessageId": "1234"}
        publish_event = publish_sns_message("sns:topic")
        task_queue = TaskQueue(publish_event)

        @task_queue.task(schedules=['bbb'])
        def funcy():
            return "Blah"

        funcy.delay()
        event = jsonpickle.dumps((funcy.path, (), {}))
        mock_send_sns_message.assert_called_once_with(
            Message=event,
            TopicArn="sns:topic",
        )

        mock_background_func = MagicMock()
        mock_background_func.__name__ = "background_func"
        mock_background_func.__module__ = "testing"
        mock_background_func.return_value = "awesome"
        composed_mock = task_queue.task()(mock_background_func)

        event = jsonpickle.dumps((composed_mock.path, (), {}))
        sns_consumer = handle_sns_message(task_queue.process_event)
        sns_consumer({
            "Records": [{
                'Sns': {
                    'Message': event
                }
            }]
        }, {})

        mock_background_func.assert_called_once_with()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号