KafkaConsumer.java 文件源码

java
阅读 29 收藏 0 点赞 0 评论 0

项目:java-kafka-client-libs 作者:
private void setUpConsumer( Map<String, Integer> topicMap, MessageHandler<?> handler, Properties consumerProps ) {
    _executors = new HashMap<String, ExecutorService>();
    _topicConsumers = new HashMap<String, ConsumerConnector>();

    for ( String topic : topicMap.keySet() ) {
        String normalizedTopic = topic.replace( ".", "_" );
        String normalizedConsumerGroupId = getGroupId( consumerProps.getProperty( "group.id" ), normalizedTopic );
        consumerProps.setProperty( "group.id", normalizedConsumerGroupId );
        LOG.warn( "Consuming topic '" + topic + "' with group.id '" + normalizedConsumerGroupId + "'" );
        LOG.warn( consumerProps.toString() );
        ConsumerConfig topicConfig = new ConsumerConfig( consumerProps );
        _topicConsumers.put( topic, kafka.consumer.Consumer.createJavaConsumerConnector( topicConfig ) );
    }
    _topicMap = topicMap;
    _handler = handler;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号