/**
* Gets all of the input splits for the {@code topic}, filtering out any {@link InputSplit}s already consumed by the
* {@code group}.
*
* @param conf
* the job configuration.
* @param topic
* the topic.
* @param group
* the consumer group.
* @return input splits for the job.
* @throws IOException
*/
List<InputSplit> getInputSplits(final Configuration conf, final String topic, final String group)
throws IOException {
final List<InputSplit> splits = Lists.newArrayList();
final ZkUtils zk = getZk(conf);
final Map<Broker, SimpleConsumer> consumers = Maps.newHashMap();
try {
for (final Partition partition : zk.getPartitions(topic)) {
// cache the consumer connections - each partition will make use of each broker consumer
final Broker broker = partition.getBroker();
if (!consumers.containsKey(broker)) {
consumers.put(broker, getConsumer(broker));
}
// grab all valid offsets
final List<Long> offsets = getOffsets(consumers.get(broker), topic, partition.getPartId(),
zk.getLastCommit(group, partition), getIncludeOffsetsAfterTimestamp(conf),
getMaxSplitsPerPartition(conf));
for (int i = 0; i < offsets.size() - 1; i++) {
// ( offsets in descending order )
final long start = offsets.get(i + 1);
final long end = offsets.get(i);
// since the offsets are in descending order, the first offset in the list is the largest offset for
// the current partition. This split will be in charge of committing the offset for this partition.
final boolean partitionCommitter = (i == 0);
final InputSplit split = new KafkaInputSplit(partition, start, end, partitionCommitter);
LOG.debug("Created input split: " + split);
splits.add(split);
}
}
} finally {
// close resources
IOUtils.closeQuietly(zk);
for (final SimpleConsumer consumer : consumers.values()) {
consumer.close();
}
}
return splits;
}
KafkaInputFormat.java 文件源码
java
阅读 28
收藏 0
点赞 0
评论 0
项目:kangaroo
作者:
评论列表
文章目录