OldApiTopicConsumer.java 文件源码

java
阅读 19 收藏 0 点赞 0 评论 0

项目:azeroth 作者:
/**
 * 
 * @param connector
 * @param topics
 * @param processThreads 
 */
@SuppressWarnings("unchecked")
public OldApiTopicConsumer(ConsumerContext context) {

    this.consumerContext = context;
    try {
        Class<?> deserializerClass = Class
            .forName(context.getProperties().getProperty("value.deserializer"));
        deserializer = (Deserializer<Object>) deserializerClass.newInstance();
    } catch (Exception e) {
    }
    this.connector = kafka.consumer.Consumer
        .createJavaConsumerConnector(new ConsumerConfig(context.getProperties()));

    int poolSize = consumerContext.getMessageHandlers().size();
    this.fetchExecutor = new StandardThreadExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS,
        poolSize, new StandardThreadFactory("KafkaFetcher"));

    this.defaultProcessExecutor = new StandardThreadExecutor(1, context.getMaxProcessThreads(),
        30, TimeUnit.SECONDS, context.getMaxProcessThreads(),
        new StandardThreadFactory("KafkaProcessor"), new PoolFullRunsPolicy());

    logger.info(
        "Kafka Conumer ThreadPool initialized,fetchPool Size:{},defalutProcessPool Size:{} ",
        poolSize, context.getMaxProcessThreads());
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号