private void processStreamsByTopic(String topicKeys, List<KafkaStream<byte[], byte[]>> streamList) {
// init stream thread pool
ExecutorService streamPool = Executors.newFixedThreadPool(partitions);
String[] topics = StringUtils.split(topicKeys, ",");
if (log.isDebugEnabled())
log.debug("准备处理消息流集合 KafkaStreamList,topic count={},topics={}, partitions/topic={}", topics.length, topicKeys, partitions);
//遍历stream
AtomicInteger index = new AtomicInteger(0);
for (KafkaStream<byte[], byte[]> stream : streamList) {
Thread streamThread = new Thread() {
@Override
public void run() {
int i = index.getAndAdd(1);
if (log.isDebugEnabled())
log.debug("处理消息流KafkaStream -- No.={}, partitions={}", i, partitions + ":" + i);
ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();
processStreamByConsumer(topicKeys, consumerIterator);
}
};
streamPool.execute(streamThread);
}
}
KafkaReceiver.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:koper
作者:
评论列表
文章目录