def test_base_consumer_without_cluster_name(
self,
topic,
consumer_init_kwargs
):
with mock.patch(
'yelp_kafka.discovery.get_kafka_cluster'
) as mock_get_kafka_cluster, mock.patch(
'kafka_utils.util.config.ClusterConfig.__init__',
return_value=None
) as mock_cluster_config_init:
consumer = BaseConsumer(
topic_to_consumer_topic_state_map={topic: None},
auto_offset_reset='largest',
**consumer_init_kwargs
)
consumer._region_cluster_config
assert mock_get_kafka_cluster.call_count == 0
config = get_config()
mock_cluster_config_init.assert_called_once_with(
type='standard',
name='data_pipeline',
broker_list=config.kafka_broker_list,
zookeeper=config.kafka_zookeeper
)
评论列表
文章目录