@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
final JobClient client = new JobClient(jobConf);
ClusterStatus stat = client.getClusterStatus(true);
int numTrackers = stat.getTaskTrackers();
final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
// Total size of distributed cache files to be generated
final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
// Get the path of the special file
String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
throw new RuntimeException("Invalid metadata: #files (" + fileCount
+ "), total_size (" + totalSize + "), filelisturi ("
+ distCacheFileList + ")");
}
Path sequenceFile = new Path(distCacheFileList);
FileSystem fs = sequenceFile.getFileSystem(jobConf);
FileStatus srcst = fs.getFileStatus(sequenceFile);
// Consider the number of TTs * mapSlotsPerTracker as number of mappers.
int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
int numSplits = numTrackers * numMapSlotsPerTracker;
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
// Average size of data to be generated by each map task
final long targetSize = Math.max(totalSize / numSplits,
DistributedCacheEmulator.AVG_BYTES_PER_MAP);
long splitStartPosition = 0L;
long splitEndPosition = 0L;
long acc = 0L;
long bytesRemaining = srcst.getLen();
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
while (reader.next(key, value)) {
// If adding this file would put this split past the target size,
// cut the last split and put this file in the next split.
if (acc + key.get() > targetSize && acc != 0) {
long splitSize = splitEndPosition - splitStartPosition;
splits.add(new FileSplit(
sequenceFile, splitStartPosition, splitSize, (String[])null));
bytesRemaining -= splitSize;
splitStartPosition = splitEndPosition;
acc = 0L;
}
acc += key.get();
splitEndPosition = reader.getPosition();
}
} finally {
if (reader != null) {
reader.close();
}
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(
sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
}
return splits;
}
GenerateDistCacheData.java 文件源码
java
阅读 59
收藏 0
点赞 0
评论 0
项目:hadoop
作者:
评论列表
文章目录