SimpleKafkaConsumer.java 文件源码

java
阅读 36 收藏 0 点赞 0 评论 0

项目:twill 作者:
/**
 * Creates an Iterator of FetchedMessage based on the given message set. The iterator would also updates
 * the offset while iterating.
 */
private Iterator<FetchedMessage> createFetchedMessages(ByteBufferMessageSet messageSet, final AtomicLong offset) {
  final Iterator<MessageAndOffset> messages = messageSet.iterator();
  return new AbstractIterator<FetchedMessage>() {
    @Override
    protected FetchedMessage computeNext() {
      while (messages.hasNext()) {
        MessageAndOffset message = messages.next();
        long msgOffset = message.offset();
        if (msgOffset < offset.get()) {
          LOG.trace("Received old offset {}, expecting {} on {}. Message Ignored.",
                    msgOffset, offset.get(), topicPart);
          continue;
        }

        fetchedMessage.setPayload(message.message().payload());
        fetchedMessage.setOffset(message.offset());
        fetchedMessage.setNextOffset(message.nextOffset());

        return fetchedMessage;
      }
      return endOfData();
    }
  };
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号