@Override
protected List<LocatedFileStatus> listLocatedStatus(JobContext job
)throws IOException {
List<LocatedFileStatus> files = super.listLocatedStatus(job);
int len = files.size();
for(int i=0; i < len; ++i) {
FileStatus file = files.get(i);
if (file.isDir()) { // it's a MapFile
Path p = file.getPath();
FileSystem fs = p.getFileSystem(job.getConfiguration());
// use the data file
files.set(i, fs.listLocatedStatus(
new Path(p, MapFile.DATA_FILE_NAME)).next());
}
}
return files;
}
java类org.apache.hadoop.io.MapFile的实例源码
SequenceFileInputFormat.java 文件源码
项目:hadoop-EAR
阅读 21
收藏 0
点赞 0
评论 0
MapFileRead.java 文件源码
项目:search-1047
阅读 19
收藏 0
点赞 0
评论 0
public static void main(String[] args) throws IOException {
String uri = args[0];
conf = new Configuration();
fs = FileSystem.get(URI.create(uri),conf);
path = new Path(uri);
try {
reader = new MapFile.Reader(fs, uri, conf);
WritableComparable key = (WritableComparable) ReflectionUtils.newInstance(reader.getKeyClass(),conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(),conf);
while(reader.next(key,value)) {
System.out.printf("%s\t%s\n", key, value);
}
}
finally {
IOUtils.closeStream(reader);
}
}
Fast5NamesCmd.java 文件源码
项目:hpg-pore
阅读 17
收藏 0
点赞 0
评论 0
private static void runHadoopGetNames(String in) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if (!fs.exists(new Path(in))) {
System.out.println("Error: Hdfs file " + in + " does not exist!");
System.exit(-1);
}
MapFile.Reader reader = null;
try {
reader = new MapFile.Reader(fs, in, conf);
Text key = (Text) reader.getKeyClass().newInstance();
BytesWritable value = (BytesWritable) reader.getValueClass().newInstance();
while (reader.next(key, value)) {
System.out.println(key.toString());
}
} catch (Exception e) {
e.printStackTrace();
if (reader != null) reader.close();
}
}
TestFileOutputCommitter.java 文件源码
项目:hadoop-plus
阅读 24
收藏 0
点赞 0
评论 0
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
TestFileOutputCommitter.java 文件源码
项目:hadoop-plus
阅读 22
收藏 0
点赞 0
评论 0
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
TestCodec.java 文件源码
项目:hadoop-plus
阅读 24
收藏 0
点赞 0
评论 0
private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
CompressionType type, int records) throws Exception {
FileSystem fs = FileSystem.get(conf);
LOG.info("Creating MapFiles with " + records +
" records using codec " + clazz.getSimpleName());
Path path = new Path(new Path(
System.getProperty("test.build.data", "/tmp")),
clazz.getSimpleName() + "-" + type + "-" + records);
LOG.info("Writing " + path);
createMapFile(conf, fs, path, clazz.newInstance(), type, records);
MapFile.Reader reader = new MapFile.Reader(path, conf);
Text key1 = new Text("002");
assertNotNull(reader.get(key1, new Text()));
Text key2 = new Text("004");
assertNotNull(reader.get(key2, new Text()));
}
HadoopFilePerformanceTest.java 文件源码
项目:Megh
阅读 28
收藏 0
点赞 0
评论 0
private void writeMapFile() throws Exception
{
Path path = Testfile.MAPFILE.filepath();
Text key = new Text();
Text value = new Text();
long fsMinBlockSize = conf.getLong("dfs.namenode.fs-limits.min-block-size", 0);
long testBlockSize = (blockSize < fsMinBlockSize ) ? fsMinBlockSize : (long)blockSize;
MapFile.Writer writer = new MapFile.Writer(conf, path,
MapFile.Writer.keyClass(key.getClass()),
MapFile.Writer.valueClass(value.getClass()),
MapFile.Writer.compression(SequenceFile.CompressionType.NONE),
SequenceFile.Writer.blockSize(testBlockSize),
SequenceFile.Writer.bufferSize((int)testBlockSize));
for (int i = 0; i < testSize; i++) {
key.set(getKey(i));
value.set(getValue());
writer.append(key, value);
}
IOUtils.closeStream(writer);
}
SegmentReader.java 文件源码
项目:anthelion
阅读 23
收藏 0
点赞 0
评论 0
private List<Writable> getMapRecords(Path dir, Text key) throws Exception {
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf());
ArrayList<Writable> res = new ArrayList<Writable>();
Class keyClass = readers[0].getKeyClass();
Class valueClass = readers[0].getValueClass();
if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
throw new IOException("Incompatible key (" + keyClass.getName() + ")");
Writable value = (Writable)valueClass.newInstance();
// we don't know the partitioning schema
for (int i = 0; i < readers.length; i++) {
if (readers[i].get(key, value) != null) {
res.add(value);
value = (Writable)valueClass.newInstance();
Text aKey = (Text) keyClass.newInstance();
while (readers[i].next(aKey, value) && aKey.equals(key)) {
res.add(value);
value = (Writable)valueClass.newInstance();
}
}
readers[i].close();
}
return res;
}
TestLinkDbMerger.java 文件源码
项目:anthelion
阅读 19
收藏 0
点赞 0
评论 0
private void createLinkDb(Configuration config, FileSystem fs, Path linkdb, TreeMap init) throws Exception {
LOG.fine("* creating linkdb: " + linkdb);
Path dir = new Path(linkdb, LinkDb.CURRENT_NAME);
MapFile.Writer writer = new MapFile.Writer(config, fs, new Path(dir, "part-00000").toString(), Text.class, Inlinks.class);
Iterator it = init.keySet().iterator();
while (it.hasNext()) {
String key = (String)it.next();
Inlinks inlinks = new Inlinks();
String[] vals = (String[])init.get(key);
for (int i = 0; i < vals.length; i++) {
Inlink in = new Inlink(vals[i], vals[i]);
inlinks.add(in);
}
writer.append(new Text(key), inlinks);
}
writer.close();
}
SequenceFileInputFormat.java 文件源码
项目:FlexMap
阅读 22
收藏 0
点赞 0
评论 0
@Override
protected List<FileStatus> listStatus(JobContext job
)throws IOException {
List<FileStatus> files = super.listStatus(job);
int len = files.size();
for(int i=0; i < len; ++i) {
FileStatus file = files.get(i);
if (file.isDirectory()) { // it's a MapFile
Path p = file.getPath();
FileSystem fs = p.getFileSystem(job.getConfiguration());
// use the data file
files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
}
}
return files;
}