def test_send(header_timestamp_mock, pubsub_client_mock):
messaging = queue_messaging.Messaging.create_from_dict({
'TOPIC': 'test-topic',
})
model = FancyEvent(
uuid_field=uuid.UUID('cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e'),
string_field='Just testing!'
)
header_timestamp_mock.return_value = datetime.datetime(
2016, 12, 10, 11, 15, 45, 123456, tzinfo=datetime.timezone.utc)
messaging.send(model)
topic_mock = pubsub_client_mock.return_value.topic
publish_mock = topic_mock.return_value.publish
topic_mock.assert_called_with('test-topic')
publish_mock.assert_called_with(
test_utils.EncodedJson({
"uuid_field": "cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e",
"string_field": "Just testing!"
}),
timestamp='2016-12-10T11:15:45.123456Z',
type='FancyEvent'
)
评论列表
文章目录