java类org.apache.hadoop.io.MapFile的实例源码

SequenceFileInputFormat.java 文件源码 项目:hadoop-EAR 阅读 21 收藏 0 点赞 0 评论 0
@Override
protected List<LocatedFileStatus> listLocatedStatus(JobContext job
                                      )throws IOException {

  List<LocatedFileStatus> files = super.listLocatedStatus(job);
  int len = files.size();
  for(int i=0; i < len; ++i) {
    FileStatus file = files.get(i);
    if (file.isDir()) {     // it's a MapFile
      Path p = file.getPath();
      FileSystem fs = p.getFileSystem(job.getConfiguration());
      // use the data file
      files.set(i, fs.listLocatedStatus(
          new Path(p, MapFile.DATA_FILE_NAME)).next());
    }
  }
  return files;
}
MapFileRead.java 文件源码 项目:search-1047 阅读 19 收藏 0 点赞 0 评论 0
public static void main(String[] args) throws IOException {

  String uri = args[0];
  conf = new Configuration();
  fs = FileSystem.get(URI.create(uri),conf);
  path = new Path(uri); 
  try {
    reader = new MapFile.Reader(fs, uri, conf);
    WritableComparable key = (WritableComparable) ReflectionUtils.newInstance(reader.getKeyClass(),conf);
    Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(),conf); 
    while(reader.next(key,value)) {
      System.out.printf("%s\t%s\n", key, value);
    }
   } 
    finally {
      IOUtils.closeStream(reader);
    } 
}
Fast5NamesCmd.java 文件源码 项目:hpg-pore 阅读 17 收藏 0 点赞 0 评论 0
private static void runHadoopGetNames(String in) throws IOException {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);

    if (!fs.exists(new Path(in))) {
        System.out.println("Error: Hdfs file " + in + " does not exist!");
        System.exit(-1);
    }

    MapFile.Reader reader = null;

    try {
        reader = new MapFile.Reader(fs, in, conf);
        Text key = (Text) reader.getKeyClass().newInstance();
        BytesWritable value = (BytesWritable) reader.getValueClass().newInstance();
        while (reader.next(key, value)) {
            System.out.println(key.toString());
        }
    } catch (Exception e) {
        e.printStackTrace();
        if (reader != null) reader.close();
    }


}
TestFileOutputCommitter.java 文件源码 项目:hadoop-plus 阅读 24 收藏 0 点赞 0 评论 0
private void validateMapFileOutputContent(
    FileSystem fs, Path dir) throws IOException {
  // map output is a directory with index and data files
  Path expectedMapDir = new Path(dir, partFile);
  assert(fs.getFileStatus(expectedMapDir).isDirectory());    
  FileStatus[] files = fs.listStatus(expectedMapDir);
  int fileCount = 0;
  boolean dataFileFound = false; 
  boolean indexFileFound = false; 
  for (FileStatus f : files) {
    if (f.isFile()) {
      ++fileCount;
      if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
        indexFileFound = true;
      }
      else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
        dataFileFound = true;
      }
    }
  }
  assert(fileCount > 0);
  assert(dataFileFound && indexFileFound);
}
TestFileOutputCommitter.java 文件源码 项目:hadoop-plus 阅读 22 收藏 0 点赞 0 评论 0
private void validateMapFileOutputContent(
    FileSystem fs, Path dir) throws IOException {
  // map output is a directory with index and data files
  Path expectedMapDir = new Path(dir, partFile);
  assert(fs.getFileStatus(expectedMapDir).isDirectory());    
  FileStatus[] files = fs.listStatus(expectedMapDir);
  int fileCount = 0;
  boolean dataFileFound = false; 
  boolean indexFileFound = false; 
  for (FileStatus f : files) {
    if (f.isFile()) {
      ++fileCount;
      if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
        indexFileFound = true;
      }
      else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
        dataFileFound = true;
      }
    }
  }
  assert(fileCount > 0);
  assert(dataFileFound && indexFileFound);
}
TestCodec.java 文件源码 项目:hadoop-plus 阅读 24 收藏 0 点赞 0 评论 0
private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
    CompressionType type, int records) throws Exception {

  FileSystem fs = FileSystem.get(conf);
  LOG.info("Creating MapFiles with " + records  + 
          " records using codec " + clazz.getSimpleName());
  Path path = new Path(new Path(
      System.getProperty("test.build.data", "/tmp")),
    clazz.getSimpleName() + "-" + type + "-" + records);

  LOG.info("Writing " + path);
  createMapFile(conf, fs, path, clazz.newInstance(), type, records);
  MapFile.Reader reader = new MapFile.Reader(path, conf);
  Text key1 = new Text("002");
  assertNotNull(reader.get(key1, new Text()));
  Text key2 = new Text("004");
  assertNotNull(reader.get(key2, new Text()));
}
HadoopFilePerformanceTest.java 文件源码 项目:Megh 阅读 28 收藏 0 点赞 0 评论 0
private void writeMapFile() throws Exception
{
  Path path = Testfile.MAPFILE.filepath();

  Text key = new Text();
  Text value = new Text();


  long fsMinBlockSize = conf.getLong("dfs.namenode.fs-limits.min-block-size", 0);

  long testBlockSize = (blockSize < fsMinBlockSize ) ? fsMinBlockSize : (long)blockSize;

  MapFile.Writer writer = new MapFile.Writer(conf, path,
      MapFile.Writer.keyClass(key.getClass()),
      MapFile.Writer.valueClass(value.getClass()),
      MapFile.Writer.compression(SequenceFile.CompressionType.NONE),
      SequenceFile.Writer.blockSize(testBlockSize),
      SequenceFile.Writer.bufferSize((int)testBlockSize));
  for (int i = 0; i < testSize; i++) {
    key.set(getKey(i));
    value.set(getValue());
    writer.append(key, value);
  }
  IOUtils.closeStream(writer);
}
SegmentReader.java 文件源码 项目:anthelion 阅读 23 收藏 0 点赞 0 评论 0
private List<Writable> getMapRecords(Path dir, Text key) throws Exception {
  MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf());
  ArrayList<Writable> res = new ArrayList<Writable>();
  Class keyClass = readers[0].getKeyClass();
  Class valueClass = readers[0].getValueClass();
  if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
    throw new IOException("Incompatible key (" + keyClass.getName() + ")");
  Writable value = (Writable)valueClass.newInstance();
  // we don't know the partitioning schema
  for (int i = 0; i < readers.length; i++) {
    if (readers[i].get(key, value) != null) {
      res.add(value);
      value = (Writable)valueClass.newInstance();
      Text aKey = (Text) keyClass.newInstance();
      while (readers[i].next(aKey, value) && aKey.equals(key)) {
        res.add(value);
        value = (Writable)valueClass.newInstance();
      }
    }
    readers[i].close();
  }
  return res;
}
TestLinkDbMerger.java 文件源码 项目:anthelion 阅读 19 收藏 0 点赞 0 评论 0
private void createLinkDb(Configuration config, FileSystem fs, Path linkdb, TreeMap init) throws Exception {
  LOG.fine("* creating linkdb: " + linkdb);
  Path dir = new Path(linkdb, LinkDb.CURRENT_NAME);
  MapFile.Writer writer = new MapFile.Writer(config, fs, new Path(dir, "part-00000").toString(), Text.class, Inlinks.class);
  Iterator it = init.keySet().iterator();
  while (it.hasNext()) {
    String key = (String)it.next();
    Inlinks inlinks = new Inlinks();
    String[] vals = (String[])init.get(key);
    for (int i = 0; i < vals.length; i++) {
      Inlink in = new Inlink(vals[i], vals[i]);
      inlinks.add(in);
    }
    writer.append(new Text(key), inlinks);
  }
  writer.close();
}
SequenceFileInputFormat.java 文件源码 项目:FlexMap 阅读 22 收藏 0 点赞 0 评论 0
@Override
protected List<FileStatus> listStatus(JobContext job
                                      )throws IOException {

  List<FileStatus> files = super.listStatus(job);
  int len = files.size();
  for(int i=0; i < len; ++i) {
    FileStatus file = files.get(i);
    if (file.isDirectory()) {     // it's a MapFile
      Path p = file.getPath();
      FileSystem fs = p.getFileSystem(job.getConfiguration());
      // use the data file
      files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
    }
  }
  return files;
}


问题


面经


文章

微信
公众号

扫码关注公众号