KafkaInputFormat.java 文件源码

java
阅读 25 收藏 0 点赞 0 评论 0

项目:kangaroo 作者:
@VisibleForTesting
List<Long> getOffsets(final SimpleConsumer consumer, final String topic, final int partitionNum,
        final long lastCommit, final long asOfTime, final int maxSplitsPerPartition) {
    // all offsets that exist for this partition (in descending order)
    final long[] allOffsets = consumer.getOffsetsBefore(topic, partitionNum, OffsetRequest.LatestTime(),
            Integer.MAX_VALUE);

    // this gets us an offset that is strictly before 'asOfTime', or zero if none exist before that time
    final long[] offsetsBeforeAsOf = consumer.getOffsetsBefore(topic, partitionNum, asOfTime, 1);
    final long includeAfter = offsetsBeforeAsOf.length == 1 ? offsetsBeforeAsOf[0] : 0;

    // note that the offsets are in descending order
    List<Long> result = Lists.newArrayList();
    for (final long offset : allOffsets) {
        if (offset > lastCommit && offset > includeAfter) {
            result.add(offset);
        } else {
            // we add "lastCommit" iff it is after "includeAfter"
            if (lastCommit > includeAfter) {
                result.add(lastCommit);
            }
            // we can break out of loop here bc offsets are in desc order, and we've hit the latest one to include
            break;
        }
    }
    // to get maxSplitsPerPartition number of splits, you need (maxSplitsPerPartition + 1) number of offsets.
    if (result.size() - 1 > maxSplitsPerPartition) {
        result = result.subList(result.size() - maxSplitsPerPartition - 1, result.size());
    }
    LOG.debug(String.format("Offsets for %s:%d:%d = %s", consumer.host(), consumer.port(), partitionNum, result));
    return result;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号