public void addPartitions(Map<TopicAndPartition, Long> partitionAndOffsets) {
Utils.lockInterruptibly(partitionMapLock);
try {
for (Map.Entry<TopicAndPartition, Long> entry : partitionAndOffsets.entrySet()) {
TopicAndPartition topicAndPartition = entry.getKey();
long offset = entry.getValue();
// If the partitionMap already has the topic/partition, then do not update the map with the old offset
if (!partitionMap.containsKey(topicAndPartition))
partitionMap.put(topicAndPartition,
PartitionTopicInfo.isOffsetInvalid(offset) ? handleOffsetOutOfRange(topicAndPartition) : offset);
}
partitionMapCond.signalAll();
} finally {
partitionMapLock.unlock();
}
}
java类kafka.consumer.PartitionTopicInfo的实例源码
AbstractFetcherThread.java 文件源码
项目:buka
阅读 30
收藏 0
点赞 0
评论 0
MockKafkaStream.java 文件源码
项目:incubator-gobblin
阅读 32
收藏 0
点赞 0
评论 0
public void pushToStream(String message) {
int streamNo = (int) this.nextStream.incrementAndGet() % this.queues.size();
AtomicLong offset = this.offsets.get(streamNo);
BlockingQueue<FetchedDataChunk> queue = this.queues.get(streamNo);
AtomicLong thisOffset = new AtomicLong(offset.incrementAndGet());
List<Message> seq = Lists.newArrayList();
seq.add(new Message(message.getBytes(Charsets.UTF_8)));
ByteBufferMessageSet messageSet = new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, offset, JavaConversions.asScalaBuffer(seq));
FetchedDataChunk chunk = new FetchedDataChunk(messageSet,
new PartitionTopicInfo("topic", streamNo, queue, thisOffset, thisOffset, new AtomicInteger(1), "clientId"),
thisOffset.get());
queue.add(chunk);
}