java类kafka.consumer.PartitionTopicInfo的实例源码

AbstractFetcherThread.java 文件源码 项目:buka 阅读 30 收藏 0 点赞 0 评论 0
public void addPartitions(Map<TopicAndPartition, Long> partitionAndOffsets) {
    Utils.lockInterruptibly(partitionMapLock);
    try {
        for (Map.Entry<TopicAndPartition, Long> entry : partitionAndOffsets.entrySet()) {
            TopicAndPartition topicAndPartition = entry.getKey();
            long offset = entry.getValue();
            // If the partitionMap already has the topic/partition, then do not update the map with the old offset
            if (!partitionMap.containsKey(topicAndPartition))
                partitionMap.put(topicAndPartition,
                        PartitionTopicInfo.isOffsetInvalid(offset) ? handleOffsetOutOfRange(topicAndPartition) : offset);
        }
        partitionMapCond.signalAll();
    } finally {
        partitionMapLock.unlock();
    }
}
MockKafkaStream.java 文件源码 项目:incubator-gobblin 阅读 32 收藏 0 点赞 0 评论 0
public void pushToStream(String message) {

    int streamNo = (int) this.nextStream.incrementAndGet() % this.queues.size();

    AtomicLong offset = this.offsets.get(streamNo);
    BlockingQueue<FetchedDataChunk> queue = this.queues.get(streamNo);

    AtomicLong thisOffset = new AtomicLong(offset.incrementAndGet());

    List<Message> seq = Lists.newArrayList();
    seq.add(new Message(message.getBytes(Charsets.UTF_8)));
    ByteBufferMessageSet messageSet = new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, offset, JavaConversions.asScalaBuffer(seq));

    FetchedDataChunk chunk = new FetchedDataChunk(messageSet,
        new PartitionTopicInfo("topic", streamNo, queue, thisOffset, thisOffset, new AtomicInteger(1), "clientId"),
        thisOffset.get());

    queue.add(chunk);
  }


问题


面经


文章

微信
公众号

扫码关注公众号