@SuppressWarnings("unchecked")
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
if (kids != null) {
for (int i = 0; i < kids.length; ++i) {
kids[i].initialize(((CompositeInputSplit)split).get(i), context);
if (kids[i].key() == null) {
continue;
}
// get keyclass
if (keyclass == null) {
keyclass = kids[i].createKey().getClass().
asSubclass(WritableComparable.class);
}
// create priority queue
if (null == q) {
cmp = WritableComparator.get(keyclass, conf);
q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
new Comparator<ComposableRecordReader<K,?>>() {
public int compare(ComposableRecordReader<K,?> o1,
ComposableRecordReader<K,?> o2) {
return cmp.compare(o1.key(), o2.key());
}
});
}
// Explicit check for key class agreement
if (!keyclass.equals(kids[i].key().getClass())) {
throw new ClassCastException("Child key classes fail to agree");
}
// add the kid to priority queue if it has any elements
if (kids[i].hasNext()) {
q.add(kids[i]);
}
}
}
}
CompositeRecordReader.java 文件源码
java
阅读 21
收藏 0
点赞 0
评论 0
项目:hadoop
作者:
评论列表
文章目录