public void addPartitions(Map<TopicAndPartition, Long> partitionAndOffsets) {
Utils.lockInterruptibly(partitionMapLock);
try {
for (Map.Entry<TopicAndPartition, Long> entry : partitionAndOffsets.entrySet()) {
TopicAndPartition topicAndPartition = entry.getKey();
long offset = entry.getValue();
// If the partitionMap already has the topic/partition, then do not update the map with the old offset
if (!partitionMap.containsKey(topicAndPartition))
partitionMap.put(topicAndPartition,
PartitionTopicInfo.isOffsetInvalid(offset) ? handleOffsetOutOfRange(topicAndPartition) : offset);
}
partitionMapCond.signalAll();
} finally {
partitionMapLock.unlock();
}
}
AbstractFetcherThread.java 文件源码
java
阅读 30
收藏 0
点赞 0
评论 0
项目:buka
作者:
评论列表
文章目录