test_integration.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:queue-messaging 作者: socialwifi 项目源码 文件源码
def test_send(header_timestamp_mock, pubsub_client_mock):
    messaging = queue_messaging.Messaging.create_from_dict({
        'TOPIC': 'test-topic',
    })
    model = FancyEvent(
        uuid_field=uuid.UUID('cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e'),
        string_field='Just testing!'
    )
    header_timestamp_mock.return_value = datetime.datetime(
        2016, 12, 10, 11, 15, 45, 123456, tzinfo=datetime.timezone.utc)

    messaging.send(model)

    topic_mock = pubsub_client_mock.return_value.topic
    publish_mock = topic_mock.return_value.publish
    topic_mock.assert_called_with('test-topic')
    publish_mock.assert_called_with(
        test_utils.EncodedJson({
            "uuid_field": "cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e",
            "string_field": "Just testing!"
        }),
        timestamp='2016-12-10T11:15:45.123456Z',
        type='FancyEvent'
    )
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号