private void setUpConsumer( Map<String, Integer> topicMap, MessageHandler<?> handler, Properties consumerProps ) {
_executors = new HashMap<String, ExecutorService>();
_topicConsumers = new HashMap<String, ConsumerConnector>();
for ( String topic : topicMap.keySet() ) {
String normalizedTopic = topic.replace( ".", "_" );
String normalizedConsumerGroupId = getGroupId( consumerProps.getProperty( "group.id" ), normalizedTopic );
consumerProps.setProperty( "group.id", normalizedConsumerGroupId );
LOG.warn( "Consuming topic '" + topic + "' with group.id '" + normalizedConsumerGroupId + "'" );
LOG.warn( consumerProps.toString() );
ConsumerConfig topicConfig = new ConsumerConfig( consumerProps );
_topicConsumers.put( topic, kafka.consumer.Consumer.createJavaConsumerConnector( topicConfig ) );
}
_topicMap = topicMap;
_handler = handler;
}
KafkaConsumer.java 文件源码
java
阅读 29
收藏 0
点赞 0
评论 0
项目:java-kafka-client-libs
作者:
评论列表
文章目录