/**
* Creates an Iterator of FetchedMessage based on the given message set. The iterator would also updates
* the offset while iterating.
*/
private Iterator<FetchedMessage> createFetchedMessages(ByteBufferMessageSet messageSet, final AtomicLong offset) {
final Iterator<MessageAndOffset> messages = messageSet.iterator();
return new AbstractIterator<FetchedMessage>() {
@Override
protected FetchedMessage computeNext() {
while (messages.hasNext()) {
MessageAndOffset message = messages.next();
long msgOffset = message.offset();
if (msgOffset < offset.get()) {
LOG.trace("Received old offset {}, expecting {} on {}. Message Ignored.",
msgOffset, offset.get(), topicPart);
continue;
}
fetchedMessage.setPayload(message.message().payload());
fetchedMessage.setOffset(message.offset());
fetchedMessage.setNextOffset(message.nextOffset());
return fetchedMessage;
}
return endOfData();
}
};
}
SimpleKafkaConsumer.java 文件源码
java
阅读 36
收藏 0
点赞 0
评论 0
项目:twill
作者:
评论列表
文章目录