private List<InputSplit> createSplits(JobContext jobContext,
List<DynamicInputChunk> chunks)
throws IOException {
int numMaps = getNumMapTasks(jobContext.getConfiguration());
final int nSplits = Math.min(numMaps, chunks.size());
List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
for (int i=0; i< nSplits; ++i) {
TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
chunks.get(i).assignTo(taskId);
splits.add(new FileSplit(chunks.get(i).getPath(), 0,
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
getMinRecordsPerChunk(jobContext.getConfiguration()),
null));
}
DistCpUtils.publish(jobContext.getConfiguration(),
CONF_LABEL_NUM_SPLITS, splits.size());
return splits;
}
DynamicInputFormat.java 文件源码
java
阅读 33
收藏 0
点赞 0
评论 0
项目:aliyun-oss-hadoop-fs
作者:
评论列表
文章目录