def test_ensure_messages_published_fails_when_overpublished(
self, topic, messages, producer, topic_offsets
):
for message in messages:
producer.publish(message)
producer.flush()
with pytest.raises(
PublicationUnensurableError
), mock.patch.object(
data_pipeline.producer,
'logger'
) as mock_logger:
producer.ensure_messages_published(messages[:2], topic_offsets)
self._assert_logged_info_correct(
mock_logger,
len(messages),
topic,
topic_offsets,
message_count=len(messages[:2])
)
评论列表
文章目录