KafkaInputFormat.java 文件源码

java
阅读 28 收藏 0 点赞 0 评论 0

项目:kangaroo 作者:
/**
 * Gets all of the input splits for the {@code topic}, filtering out any {@link InputSplit}s already consumed by the
 * {@code group}.
 * 
 * @param conf
 *            the job configuration.
 * @param topic
 *            the topic.
 * @param group
 *            the consumer group.
 * @return input splits for the job.
 * @throws IOException
 */
List<InputSplit> getInputSplits(final Configuration conf, final String topic, final String group)
        throws IOException {
    final List<InputSplit> splits = Lists.newArrayList();
    final ZkUtils zk = getZk(conf);
    final Map<Broker, SimpleConsumer> consumers = Maps.newHashMap();
    try {
        for (final Partition partition : zk.getPartitions(topic)) {

            // cache the consumer connections - each partition will make use of each broker consumer
            final Broker broker = partition.getBroker();
            if (!consumers.containsKey(broker)) {
                consumers.put(broker, getConsumer(broker));
            }

            // grab all valid offsets
            final List<Long> offsets = getOffsets(consumers.get(broker), topic, partition.getPartId(),
                    zk.getLastCommit(group, partition), getIncludeOffsetsAfterTimestamp(conf),
                    getMaxSplitsPerPartition(conf));
            for (int i = 0; i < offsets.size() - 1; i++) {
                // ( offsets in descending order )
                final long start = offsets.get(i + 1);
                final long end = offsets.get(i);
                // since the offsets are in descending order, the first offset in the list is the largest offset for
                // the current partition. This split will be in charge of committing the offset for this partition.
                final boolean partitionCommitter = (i == 0);
                final InputSplit split = new KafkaInputSplit(partition, start, end, partitionCommitter);
                LOG.debug("Created input split: " + split);
                splits.add(split);
            }
        }
    } finally {
        // close resources
        IOUtils.closeQuietly(zk);
        for (final SimpleConsumer consumer : consumers.values()) {
            consumer.close();
        }
    }
    return splits;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号