/**
*
* @param connector
* @param topics
* @param processThreads
*/
@SuppressWarnings("unchecked")
public OldApiTopicConsumer(ConsumerContext context) {
this.consumerContext = context;
try {
Class<?> deserializerClass = Class
.forName(context.getProperties().getProperty("value.deserializer"));
deserializer = (Deserializer<Object>) deserializerClass.newInstance();
} catch (Exception e) {
}
this.connector = kafka.consumer.Consumer
.createJavaConsumerConnector(new ConsumerConfig(context.getProperties()));
int poolSize = consumerContext.getMessageHandlers().size();
this.fetchExecutor = new StandardThreadExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS,
poolSize, new StandardThreadFactory("KafkaFetcher"));
this.defaultProcessExecutor = new StandardThreadExecutor(1, context.getMaxProcessThreads(),
30, TimeUnit.SECONDS, context.getMaxProcessThreads(),
new StandardThreadFactory("KafkaProcessor"), new PoolFullRunsPolicy());
logger.info(
"Kafka Conumer ThreadPool initialized,fetchPool Size:{},defalutProcessPool Size:{} ",
poolSize, context.getMaxProcessThreads());
}
OldApiTopicConsumer.java 文件源码
java
阅读 19
收藏 0
点赞 0
评论 0
项目:azeroth
作者:
评论列表
文章目录