@VisibleForTesting
List<Long> getOffsets(final SimpleConsumer consumer, final String topic, final int partitionNum,
final long lastCommit, final long asOfTime, final int maxSplitsPerPartition) {
// all offsets that exist for this partition (in descending order)
final long[] allOffsets = consumer.getOffsetsBefore(topic, partitionNum, OffsetRequest.LatestTime(),
Integer.MAX_VALUE);
// this gets us an offset that is strictly before 'asOfTime', or zero if none exist before that time
final long[] offsetsBeforeAsOf = consumer.getOffsetsBefore(topic, partitionNum, asOfTime, 1);
final long includeAfter = offsetsBeforeAsOf.length == 1 ? offsetsBeforeAsOf[0] : 0;
// note that the offsets are in descending order
List<Long> result = Lists.newArrayList();
for (final long offset : allOffsets) {
if (offset > lastCommit && offset > includeAfter) {
result.add(offset);
} else {
// we add "lastCommit" iff it is after "includeAfter"
if (lastCommit > includeAfter) {
result.add(lastCommit);
}
// we can break out of loop here bc offsets are in desc order, and we've hit the latest one to include
break;
}
}
// to get maxSplitsPerPartition number of splits, you need (maxSplitsPerPartition + 1) number of offsets.
if (result.size() - 1 > maxSplitsPerPartition) {
result = result.subList(result.size() - maxSplitsPerPartition - 1, result.size());
}
LOG.debug(String.format("Offsets for %s:%d:%d = %s", consumer.host(), consumer.port(), partitionNum, result));
return result;
}
KafkaInputFormat.java 文件源码
java
阅读 25
收藏 0
点赞 0
评论 0
项目:kangaroo
作者:
评论列表
文章目录