def test_get_log_message(
self,
log_consumer_instance,
publish_log_messages,
log_message,
log_topic
):
with mock.patch(
'yelp_kafka.discovery.get_region_cluster',
return_value=get_config().cluster_config
):
with log_consumer_instance as consumer:
publish_log_messages(log_topic, log_message, count=1)
asserter = ConsumerAsserter(
consumer=consumer,
expected_message=log_message
)
_message = consumer.get_message(blocking=True, timeout=TIMEOUT)
asserter.assert_messages([_message], expected_count=1)
评论列表
文章目录