def test_publish_fails_after_retry(self, message, producer):
# TODO(DATAPIPE-606|clin) investigate better way than mocking response
with mock.patch.object(
producer._kafka_producer.kafka_client,
'send_produce_request',
side_effect=[FailedPayloadsError]
) as mock_send_request, capture_new_messages(
message.topic
) as get_messages, pytest.raises(
MaxRetryError
):
orig_topic_to_offset_map = self.get_orig_topic_to_offset_map(producer)
producer.publish(message)
producer.flush()
messages = get_messages()
assert len(messages) == 0
assert mock_send_request.call_count == self.max_retry_count
self.assert_new_topic_to_offset_map(
producer,
message.topic,
orig_topic_to_offset_map,
published_message_count=0
)
评论列表
文章目录