java类org.apache.hadoop.io.SequenceFile的实例源码

Display.java 文件源码 项目:hadoop-oss 阅读 28 收藏 0 点赞 0 评论 0
public TextRecordInputStream(FileStatus f) throws IOException {
  final Path fpath = f.getPath();
  final Configuration lconf = getConf();
  r = new SequenceFile.Reader(lconf, 
      SequenceFile.Reader.file(fpath));
  key = ReflectionUtils.newInstance(
      r.getKeyClass().asSubclass(Writable.class), lconf);
  val = ReflectionUtils.newInstance(
      r.getValueClass().asSubclass(Writable.class), lconf);
  inbuf = new DataInputBuffer();
  outbuf = new DataOutputBuffer();
}
HashTable.java 文件源码 项目:ditb 阅读 24 收藏 0 点赞 0 评论 0
private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
     throws IOException {
  @SuppressWarnings("deprecation")
  SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
  ImmutableBytesWritable key = new ImmutableBytesWritable();
  partitions = new ArrayList<ImmutableBytesWritable>();
  while (reader.next(key)) {
    partitions.add(new ImmutableBytesWritable(key.copyBytes()));
  }
  reader.close();

  if (!Ordering.natural().isOrdered(partitions)) {
    throw new IOException("Partitions are not ordered!");
  }
}
SimpleCopyListing.java 文件源码 项目:circus-train 阅读 31 收藏 0 点赞 0 评论 0
@VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter, S3MapReduceCpOptions options, List<Path> globbedPaths)
  throws IOException {
  try {
    for (Path path : globbedPaths) {
      FileSystem sourceFS = path.getFileSystem(getConf());
      path = makeQualified(path);

      FileStatus rootStatus = sourceFS.getFileStatus(path);
      Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
      LOG.info("Root source path is {}", sourcePathRoot);

      FileStatus[] sourceFiles = sourceFS.listStatus(path);
      boolean explore = (sourceFiles != null && sourceFiles.length > 0);
      if (explore || rootStatus.isDirectory()) {
        for (FileStatus sourceStatus : sourceFiles) {
          if (sourceStatus.isFile()) {
            LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
            CopyListingFileStatus sourceCopyListingStatus = new CopyListingFileStatus(sourceStatus);
            writeToFileListing(fileListWriter, sourceCopyListingStatus, sourcePathRoot, options);
          }
          if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
            LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
            traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, options);
          }
        }
      }
    }
    fileListWriter.close();
    fileListWriter = null;
  } finally {
    IoUtil.closeSilently(LOG, fileListWriter);
  }
}
SimpleCopyListing.java 文件源码 项目:circus-train 阅读 29 收藏 0 点赞 0 评论 0
private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
  FileSystem fs = pathToListFile.getFileSystem(getConf());
  if (fs.exists(pathToListFile)) {
    fs.delete(pathToListFile, false);
  }
  return SequenceFile.createWriter(getConf(), SequenceFile.Writer.file(pathToListFile),
      SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
      SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
}
UniformSizeInputFormat.java 文件源码 项目:circus-train 阅读 22 收藏 0 点赞 0 评论 0
private SequenceFile.Reader getListingFileReader(Configuration configuration) {

    final Path listingFilePath = getListingFilePath(configuration);
    try {
      final FileSystem fileSystem = listingFilePath.getFileSystem(configuration);
      if (!fileSystem.exists(listingFilePath)) {
        throw new IllegalArgumentException("Listing file doesn't exist at: " + listingFilePath);
      }

      return new SequenceFile.Reader(configuration, SequenceFile.Reader.file(listingFilePath));
    } catch (IOException exception) {
      LOG.error("Couldn't find listing file at: " + listingFilePath, exception);
      throw new IllegalArgumentException("Couldn't find listing-file at: " + listingFilePath, exception);
    }
  }
CopyListing.java 文件源码 项目:circus-train 阅读 16 收藏 0 点赞 0 评论 0
/**
 * Validate the final resulting path listing. Checks if there are duplicate entries. If preserving ACLs, checks that
 * file system can support ACLs. If preserving XAttrs, checks that file system can support XAttrs.
 *
 * @param pathToListFile path listing build by doBuildListing
 * @param options Input options to S3MapReduceCp
 * @throws IOException Any issues while checking for duplicates and throws
 * @throws DuplicateFileException if there are duplicates
 */
private void validateFinalListing(Path pathToListFile, S3MapReduceCpOptions options)
  throws DuplicateFileException, IOException {

  Configuration config = getConf();
  FileSystem fs = pathToListFile.getFileSystem(config);

  Path sortedList = sortListing(fs, config, pathToListFile);

  SequenceFile.Reader reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(sortedList));
  try {
    Text lastKey = new Text("*"); // source relative path can never hold *
    CopyListingFileStatus lastFileStatus = new CopyListingFileStatus();

    Text currentKey = new Text();
    while (reader.next(currentKey)) {
      if (currentKey.equals(lastKey)) {
        CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
        reader.getCurrentValue(currentFileStatus);
        throw new DuplicateFileException("File "
            + lastFileStatus.getPath()
            + " and "
            + currentFileStatus.getPath()
            + " would cause duplicates. Aborting");
      }
      reader.getCurrentValue(lastFileStatus);
      lastKey.set(currentKey);
    }
  } finally {
    IOUtils.closeStream(reader);
  }
}
CopyListing.java 文件源码 项目:circus-train 阅读 20 收藏 0 点赞 0 评论 0
/**
 * Sort sequence file containing FileStatus and Text as key and value respecitvely
 *
 * @param fs File System
 * @param conf Configuration
 * @param sourceListing Source listing file
 * @return Path of the sorted file. Is source file with _sorted appended to the name
 * @throws IOException Any exception during sort.
 */
private static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing) throws IOException {
  SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, CopyListingFileStatus.class, conf);
  Path output = new Path(sourceListing.toString() + "_sorted");

  if (fs.exists(output)) {
    fs.delete(output, false);
  }

  sorter.sort(sourceListing, output);
  return output;
}
SimpleCopyListingTest.java 文件源码 项目:circus-train 阅读 16 收藏 0 点赞 0 评论 0
@Test(timeout = 10000)
public void buildListingForSingleFile() throws Exception {
  FileSystem fs = cluster.getFileSystem();
  String testRootString = "/tmp/source";
  Path testRoot = new Path(testRootString);
  if (fs.exists(testRoot)) {
    delete(fs, testRootString);
  }

  Path sourceFile = new Path(testRoot, "foo/bar/source.txt");
  Path decoyFile = new Path(testRoot, "target/mooc");
  URI targetFile = URI.create("s3://bucket/target/moo/target.txt");

  createFile(fs, sourceFile.toString());
  createFile(fs, decoyFile.toString());

  final Path listFile = new Path(testRoot, "/tmp/fileList.seq");

  listing.buildListing(listFile, options(sourceFile, targetFile));

  try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listFile))) {
    CopyListingFileStatus fileStatus = new CopyListingFileStatus();
    Text relativePath = new Text();
    assertThat(reader.next(relativePath, fileStatus), is(true));
    assertThat(relativePath.toString(), is("/source.txt"));
  }
}
SimpleCopyListingTest.java 文件源码 项目:circus-train 阅读 20 收藏 0 点赞 0 评论 0
@Test(timeout = 10000)
public void buildListingForMultipleSources() throws Exception {
  FileSystem fs = cluster.getFileSystem();
  String testRootString = "/tmp/source";
  Path testRoot = new Path(testRootString);
  if (fs.exists(testRoot)) {
    delete(fs, testRootString);
  }

  Path sourceDir1 = new Path(testRoot, "foo/baz/");
  Path sourceDir2 = new Path(testRoot, "foo/bang/");
  Path sourceFile1 = new Path(testRoot, "foo/bar/source.txt");
  URI target = URI.create("s3://bucket/target/moo/");

  fs.mkdirs(sourceDir1);
  fs.mkdirs(sourceDir2);
  createFile(fs, new Path(sourceDir1, "baz_1.dat"));
  createFile(fs, new Path(sourceDir1, "baz_2.dat"));
  createFile(fs, new Path(sourceDir2, "bang_0.dat"));
  createFile(fs, sourceFile1.toString());

  final Path listFile = new Path(testRoot, "/tmp/fileList.seq");

  listing.buildListing(listFile, options(Arrays.asList(sourceFile1, sourceDir1, sourceDir2), target));

  try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listFile))) {
    CopyListingFileStatus fileStatus = new CopyListingFileStatus();
    Text relativePath = new Text();
    assertThat(reader.next(relativePath, fileStatus), is(true));
    assertThat(relativePath.toString(), is("/source.txt"));
    assertThat(reader.next(relativePath, fileStatus), is(true));
    assertThat(relativePath.toString(), is("/baz_1.dat"));
    assertThat(reader.next(relativePath, fileStatus), is(true));
    assertThat(relativePath.toString(), is("/baz_2.dat"));
    assertThat(reader.next(relativePath, fileStatus), is(true));
    assertThat(relativePath.toString(), is("/bang_0.dat"));
  }
}
HashTable.java 文件源码 项目:ditb 阅读 29 收藏 0 点赞 0 评论 0
void writePartitionFile(Configuration conf, Path path) throws IOException {
  FileSystem fs = path.getFileSystem(conf);
  @SuppressWarnings("deprecation")
  SequenceFile.Writer writer = SequenceFile.createWriter(
    fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);

  for (int i = 0; i < partitions.size(); i++) {
    writer.append(partitions.get(i), NullWritable.get());
  }
  writer.close();
}


问题


面经


文章

微信
公众号

扫码关注公众号