java类org.apache.hadoop.io.SequenceFile的实例源码

TestTFileSeqFileComparison.java 文件源码 项目:hadoop-oss 阅读 28 收藏 0 点赞 0 评论 0
public SeqFileAppendable(FileSystem fs, Path path, int osBufferSize,
    String compress, int minBlkSize) throws IOException {
  Configuration conf = new Configuration();

  CompressionCodec codec = null;
  if ("lzo".equals(compress)) {
    codec = Compression.Algorithm.LZO.getCodec();
  }
  else if ("gz".equals(compress)) {
    codec = Compression.Algorithm.GZ.getCodec();
  }
  else if (!"none".equals(compress))
    throw new IOException("Codec not supported.");

  this.fsdos = fs.create(path, true, osBufferSize);

  if (!"none".equals(compress)) {
    writer =
        SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
            BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec);
  }
  else {
    writer =
        SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
            BytesWritable.class, SequenceFile.CompressionType.NONE, null);
  }
}
SimpleCopyListing.java 文件源码 项目:circus-train 阅读 28 收藏 0 点赞 0 评论 0
/**
 * Collect the list of <sourceRelativePath, sourceFileStatus> to be copied and write to the sequence file. In essence,
 * any file or directory that need to be copied or sync-ed is written as an entry to the sequence file, with the
 * possible exception of the source root: when either -update (sync) or -overwrite switch is specified, and if the the
 * source root is a directory, then the source root entry is not written to the sequence file, because only the
 * contents of the source directory need to be copied in this case. See
 * {@link com.hotels.bdp.circustrain.s3mapreducecp.util.ConfigurationUtil#getRelativePath} for how relative path is
 * computed. See computeSourceRootPath method for how the root path of the source is computed.
 *
 * @param fileListWriter
 * @param options
 * @param globbedPaths
 * @throws IOException
 */
@VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter, S3MapReduceCpOptions options) throws IOException {
  List<Path> globbedPaths = new ArrayList<>(options.getSources().size());

  for (Path sourcePath : options.getSources()) {
    FileSystem fs = sourcePath.getFileSystem(getConf());
    FileStatus sourceFileStatus = fs.getFileStatus(sourcePath);
    if (sourceFileStatus.isFile()) {
      LOG.debug("Adding path {}", sourceFileStatus.getPath());
      globbedPaths.add(sourceFileStatus.getPath());
    } else {
      FileStatus[] inputs = fs.globStatus(sourcePath);
      if (inputs != null && inputs.length > 0) {
        for (FileStatus onePath : inputs) {
          LOG.debug("Adding path {}", onePath.getPath());
          globbedPaths.add(onePath.getPath());
        }
      } else {
        throw new InvalidInputException("Source path " + sourcePath + " doesn't exist");
      }
    }
  }
  doBuildListing(fileListWriter, options, globbedPaths);
}
SimpleCopyListing.java 文件源码 项目:circus-train 阅读 29 收藏 0 点赞 0 评论 0
private void traverseNonEmptyDirectory(
    SequenceFile.Writer fileListWriter,
    FileStatus sourceStatus,
    Path sourcePathRoot,
    S3MapReduceCpOptions options)
  throws IOException {
  FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
  Stack<FileStatus> pathStack = new Stack<>();
  pathStack.push(sourceStatus);

  while (!pathStack.isEmpty()) {
    for (FileStatus child : getChildren(sourceFS, pathStack.pop())) {
      if (child.isFile()) {
        LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
        CopyListingFileStatus childCopyListingStatus = new CopyListingFileStatus(child);
        writeToFileListing(fileListWriter, childCopyListingStatus, sourcePathRoot, options);
      }
      if (isDirectoryAndNotEmpty(sourceFS, child)) {
        LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
        pathStack.push(child);
      }
    }
  }
}
SimpleCopyListing.java 文件源码 项目:circus-train 阅读 29 收藏 0 点赞 0 评论 0
private void writeToFileListing(
    SequenceFile.Writer fileListWriter,
    CopyListingFileStatus fileStatus,
    Path sourcePathRoot,
    S3MapReduceCpOptions options)
  throws IOException {
  LOG.debug("REL PATH: {}, FULL PATH: {}", PathUtil.getRelativePath(sourcePathRoot, fileStatus.getPath()),
      fileStatus.getPath());

  FileStatus status = fileStatus;

  if (!shouldCopy(fileStatus.getPath(), options)) {
    return;
  }

  fileListWriter.append(new Text(PathUtil.getRelativePath(sourcePathRoot, fileStatus.getPath())), status);
  fileListWriter.sync();

  if (!fileStatus.isDirectory()) {
    totalBytesToCopy += fileStatus.getLen();
  }
  totalPaths++;
}
SimpleCopyListing.java 文件源码 项目:hadoop 阅读 29 收藏 0 点赞 0 评论 0
private void writeToFileListing(SequenceFile.Writer fileListWriter,
                                CopyListingFileStatus fileStatus,
                                Path sourcePathRoot,
                                DistCpOptions options) throws IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
      fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
  }

  FileStatus status = fileStatus;

  if (!shouldCopy(fileStatus.getPath(), options)) {
    return;
  }

  fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
      fileStatus.getPath())), status);
  fileListWriter.sync();

  if (!fileStatus.isDirectory()) {
    totalBytesToCopy += fileStatus.getLen();
  }
  totalPaths++;
}
SimpleCopyListingTest.java 文件源码 项目:circus-train 阅读 17 收藏 0 点赞 0 评论 0
@Test(timeout = 10000)
public void skipFlagFiles() throws Exception {
  FileSystem fs = cluster.getFileSystem();
  Path source = new Path("/tmp/in4");
  URI target = URI.create("s3://bucket/tmp/out4/");
  createFile(fs, new Path(source, "1/_SUCCESS"));
  createFile(fs, new Path(source, "1/file"));
  createFile(fs, new Path(source, "2"));
  Path listingFile = new Path("/tmp/list4");
  listing.buildListing(listingFile, options(source, target));
  assertThat(listing.getNumberOfPaths(), is(2L));
  try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listingFile))) {
    CopyListingFileStatus fileStatus = new CopyListingFileStatus();
    Text relativePath = new Text();
    assertThat(reader.next(relativePath, fileStatus), is(true));
    assertThat(relativePath.toString(), is("/1/file"));
    assertThat(reader.next(relativePath, fileStatus), is(true));
    assertThat(relativePath.toString(), is("/2"));
    assertThat(reader.next(relativePath, fileStatus), is(false));
  }
}
SimpleCopyListingTest.java 文件源码 项目:circus-train 阅读 18 收藏 0 点赞 0 评论 0
@Test
public void failOnCloseError() throws IOException {
  File inFile = File.createTempFile("TestCopyListingIn", null);
  inFile.deleteOnExit();
  File outFile = File.createTempFile("TestCopyListingOut", null);
  outFile.deleteOnExit();
  Path source = new Path(inFile.toURI());

  Exception expectedEx = new IOException("boom");
  SequenceFile.Writer writer = mock(SequenceFile.Writer.class);
  doThrow(expectedEx).when(writer).close();

  SimpleCopyListing listing = new SimpleCopyListing(CONFIG, CREDENTIALS);
  Exception actualEx = null;
  try {
    listing.doBuildListing(writer, options(source, outFile.toURI()));
  } catch (Exception e) {
    actualEx = e;
  }
  Assert.assertNotNull("close writer didn't fail", actualEx);
  Assert.assertEquals(expectedEx, actualEx);
}
HDFSSequenceFile.java 文件源码 项目:flume-release-1.7.0 阅读 20 收藏 0 点赞 0 评论 0
protected void open(Path dstPath, CompressionCodec codeC,
    CompressionType compType, Configuration conf, FileSystem hdfs)
        throws IOException {
  if (useRawLocalFileSystem) {
    if (hdfs instanceof LocalFileSystem) {
      hdfs = ((LocalFileSystem)hdfs).getRaw();
    } else {
      logger.warn("useRawLocalFileSystem is set to true but file system " +
          "is not of type LocalFileSystem: " + hdfs.getClass().getName());
    }
  }
  if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) {
    outStream = hdfs.append(dstPath);
  } else {
    outStream = hdfs.create(dstPath);
  }
  writer = SequenceFile.createWriter(conf, outStream,
      serializer.getKeyClass(), serializer.getValueClass(), compType, codeC);

  registerCurrentStream(outStream, hdfs, dstPath);
}
TestBucketWriter.java 文件源码 项目:flume-release-1.7.0 阅读 27 收藏 0 点赞 0 评论 0
@Test
public void testEventCountingRoller() throws IOException, InterruptedException {
  int maxEvents = 100;
  MockHDFSWriter hdfsWriter = new MockHDFSWriter();
  BucketWriter bucketWriter = new BucketWriter(
      0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
      SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
      Executors.newSingleThreadExecutor(), 0, 0);

  Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
  for (int i = 0; i < 1000; i++) {
    bucketWriter.append(e);
  }

  logger.info("Number of events written: {}", hdfsWriter.getEventsWritten());
  logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
  logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());

  Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
  Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
  Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
TestBucketWriter.java 文件源码 项目:flume-release-1.7.0 阅读 21 收藏 0 点赞 0 评论 0
@Test
public void testSizeRoller() throws IOException, InterruptedException {
  int maxBytes = 300;
  MockHDFSWriter hdfsWriter = new MockHDFSWriter();
  BucketWriter bucketWriter = new BucketWriter(
      0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
      SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
      Executors.newSingleThreadExecutor(), 0, 0);

  Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
  for (int i = 0; i < 1000; i++) {
    bucketWriter.append(e);
  }

  logger.info("Number of events written: {}", hdfsWriter.getEventsWritten());
  logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
  logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());

  Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
  Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
  Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
TestGlobbedCopyListing.java 文件源码 项目:hadoop 阅读 21 收藏 0 点赞 0 评论 0
private void verifyContents(Path listingPath) throws Exception {
  SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
                                            listingPath, new Configuration());
  Text key   = new Text();
  CopyListingFileStatus value = new CopyListingFileStatus();
  Map<String, String> actualValues = new HashMap<String, String>();
  while (reader.next(key, value)) {
    if (value.isDirectory() && key.toString().equals("")) {
      // ignore root with empty relPath, which is an entry to be 
      // used for preserving root attributes etc.
      continue;
    }
    actualValues.put(value.getPath().toString(), key.toString());
  }

  Assert.assertEquals(expectedValues.size(), actualValues.size());
  for (Map.Entry<String, String> entry : actualValues.entrySet()) {
    Assert.assertEquals(entry.getValue(), expectedValues.get(entry.getKey()));
  }
}
TestBucketWriter.java 文件源码 项目:flume-release-1.7.0 阅读 22 收藏 0 点赞 0 评论 0
@Test
public void testInUsePrefix() throws IOException, InterruptedException {
  final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
  final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC";

  MockHDFSWriter hdfsWriter = new MockHDFSWriter();
  HDFSTextSerializer formatter = new HDFSTextSerializer();
  BucketWriter bucketWriter = new BucketWriter(
      ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
      SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
      Executors.newSingleThreadExecutor(), 0, 0);

  Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
  bucketWriter.append(e);

  Assert.assertTrue("Incorrect in use prefix", hdfsWriter.getOpenedFilePath().contains(PREFIX));
}
TestBucketWriter.java 文件源码 项目:flume-release-1.7.0 阅读 23 收藏 0 点赞 0 评论 0
@Test
public void testInUseSuffix() throws IOException, InterruptedException {
  final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
  final String SUFFIX = "WELCOME_TO_THE_HELLMOUNTH";

  MockHDFSWriter hdfsWriter = new MockHDFSWriter();
  HDFSTextSerializer serializer = new HDFSTextSerializer();
  BucketWriter bucketWriter = new BucketWriter(
      ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
      SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
      Executors.newSingleThreadExecutor(), 0, 0);

  Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
  bucketWriter.append(e);

  Assert.assertTrue("Incorrect in use suffix", hdfsWriter.getOpenedFilePath().contains(SUFFIX));
}
TestBucketWriter.java 文件源码 项目:flume-release-1.7.0 阅读 25 收藏 0 点赞 0 评论 0
@Test
public void testCallbackOnClose() throws IOException, InterruptedException {
  final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
  final String SUFFIX = "WELCOME_TO_THE_EREBOR";
  final AtomicBoolean callbackCalled = new AtomicBoolean(false);

  MockHDFSWriter hdfsWriter = new MockHDFSWriter();
  BucketWriter bucketWriter = new BucketWriter(
      ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
      SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
      new HDFSEventSink.WriterCallback() {
        @Override
        public void run(String filePath) {
          callbackCalled.set(true);
        }
      }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0);

  Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
  bucketWriter.append(e);
  bucketWriter.close(true);

  Assert.assertTrue(callbackCalled.get());
}
TestHDFSCompressedDataStream.java 文件源码 项目:flume-release-1.7.0 阅读 20 收藏 0 点赞 0 评论 0
@Test
public void testGzipDurability() throws Exception {
  Context context = new Context();
  HDFSCompressedDataStream writer = new HDFSCompressedDataStream();
  writer.configure(context);
  writer.open(fileURI, factory.getCodec(new Path(fileURI)),
      SequenceFile.CompressionType.BLOCK);

  String[] bodies = { "yarf!" };
  writeBodies(writer, bodies);

  byte[] buf = new byte[256];
  GZIPInputStream cmpIn = new GZIPInputStream(new FileInputStream(file));
  int len = cmpIn.read(buf);
  String result = new String(buf, 0, len, Charsets.UTF_8);
  result = result.trim(); // BodyTextEventSerializer adds a newline

  Assert.assertEquals("input and output must match", bodies[0], result);
}
DistCpV1.java 文件源码 项目:hadoop 阅读 31 收藏 0 点赞 0 评论 0
static private void finalize(Configuration conf, JobConf jobconf,
    final Path destPath, String presevedAttributes) throws IOException {
  if (presevedAttributes == null) {
    return;
  }
  EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
  if (!preseved.contains(FileAttribute.USER)
      && !preseved.contains(FileAttribute.GROUP)
      && !preseved.contains(FileAttribute.PERMISSION)) {
    return;
  }

  FileSystem dstfs = destPath.getFileSystem(conf);
  Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
  try (SequenceFile.Reader in =
      new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
    Text dsttext = new Text();
    FilePair pair = new FilePair(); 
    for(; in.next(dsttext, pair); ) {
      Path absdst = new Path(destPath, pair.output);
      updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
          preseved, dstfs);
    }
  }
}
SequenceFileIterator.java 文件源码 项目:LDA 阅读 21 收藏 0 点赞 0 评论 0
/**
     * @throws IOException if path can't be read, or its key or value class can't be instantiated
     */

public SequenceFileIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException {
  key = null;
  value = null;
  FileSystem fs = path.getFileSystem(conf);
  path = path.makeQualified(fs);
  reader = new SequenceFile.Reader(fs, path, conf);
  this.conf = conf;
  keyClass = (Class<K>) reader.getKeyClass();
  valueClass = (Class<V>) reader.getValueClass();
  noValue = NullWritable.class.equals(valueClass);
  this.reuseKeyValueInstances = reuseKeyValueInstances;
}
GenerateDistCacheData.java 文件源码 项目:hadoop 阅读 57 收藏 0 点赞 0 评论 0
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
  final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
  final JobClient client = new JobClient(jobConf);
  ClusterStatus stat = client.getClusterStatus(true);
  int numTrackers = stat.getTaskTrackers();
  final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);

  // Total size of distributed cache files to be generated
  final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
  // Get the path of the special file
  String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
  if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
    throw new RuntimeException("Invalid metadata: #files (" + fileCount
        + "), total_size (" + totalSize + "), filelisturi ("
        + distCacheFileList + ")");
  }

  Path sequenceFile = new Path(distCacheFileList);
  FileSystem fs = sequenceFile.getFileSystem(jobConf);
  FileStatus srcst = fs.getFileStatus(sequenceFile);
  // Consider the number of TTs * mapSlotsPerTracker as number of mappers.
  int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
  int numSplits = numTrackers * numMapSlotsPerTracker;

  List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
  LongWritable key = new LongWritable();
  BytesWritable value = new BytesWritable();

  // Average size of data to be generated by each map task
  final long targetSize = Math.max(totalSize / numSplits,
                            DistributedCacheEmulator.AVG_BYTES_PER_MAP);
  long splitStartPosition = 0L;
  long splitEndPosition = 0L;
  long acc = 0L;
  long bytesRemaining = srcst.getLen();
  SequenceFile.Reader reader = null;
  try {
    reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
    while (reader.next(key, value)) {

      // If adding this file would put this split past the target size,
      // cut the last split and put this file in the next split.
      if (acc + key.get() > targetSize && acc != 0) {
        long splitSize = splitEndPosition - splitStartPosition;
        splits.add(new FileSplit(
            sequenceFile, splitStartPosition, splitSize, (String[])null));
        bytesRemaining -= splitSize;
        splitStartPosition = splitEndPosition;
        acc = 0L;
      }
      acc += key.get();
      splitEndPosition = reader.getPosition();
    }
  } finally {
    if (reader != null) {
      reader.close();
    }
  }
  if (bytesRemaining != 0) {
    splits.add(new FileSplit(
        sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
  }

  return splits;
}
JHLogAnalyzer.java 文件源码 项目:hadoop 阅读 18 收藏 0 点赞 0 评论 0
public void run() {
  try {
    for(int i=start; i < end; i++) {
      String name = getFileName(i);
      Path controlFile = new Path(INPUT_DIR, "in_file_" + name);
      SequenceFile.Writer writer = null;
      try {
        writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile,
                                           Text.class, LongWritable.class,
                                           CompressionType.NONE);
        String logFile = jhLogFiles[i].getPath().toString();
        writer.append(new Text(logFile), new LongWritable(0));
      } catch(Exception e) {
        throw new IOException(e);
      } finally {
        if (writer != null)
          writer.close();
        writer = null;
      }
    }
  } catch(IOException ex) {
    LOG.error("FileCreateDaemon failed.", ex);
  }
  numFinishedThreads++;
}
TestDatamerge.java 文件源码 项目:hadoop 阅读 24 收藏 0 点赞 0 评论 0
private static SequenceFile.Writer[] createWriters(Path testdir,
    Configuration conf, int srcs, Path[] src) throws IOException {
  for (int i = 0; i < srcs; ++i) {
    src[i] = new Path(testdir, Integer.toString(i + 10, 36));
  }
  SequenceFile.Writer out[] = new SequenceFile.Writer[srcs];
  for (int i = 0; i < srcs; ++i) {
    out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
        src[i], IntWritable.class, IntWritable.class);
  }
  return out;
}
GenericMRLoadGenerator.java 文件源码 项目:hadoop 阅读 23 收藏 0 点赞 0 评论 0
public List<InputSplit> getSplits(JobContext job)
    throws IOException {

  Configuration conf = job.getConfiguration();
  Path src = new Path(conf.get(INDIRECT_INPUT_FILE, null));
  FileSystem fs = src.getFileSystem(conf);

  List<InputSplit> splits = new ArrayList<InputSplit>();
  LongWritable key = new LongWritable();
  Text value = new Text();
  for (SequenceFile.Reader sl = new SequenceFile.Reader(fs, src, conf);
       sl.next(key, value);) {
    splits.add(new IndirectSplit(new Path(value.toString()), key.get()));
  }

  return splits;
}
TestTotalOrderPartitioner.java 文件源码 项目:hadoop 阅读 23 收藏 0 点赞 0 评论 0
private static <T extends WritableComparable<?>> Path writePartitionFile(
    String testname, Configuration conf, T[] splits) throws IOException {
  final FileSystem fs = FileSystem.getLocal(conf);
  final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
                               ).makeQualified(fs);
  Path p = new Path(testdir, testname + "/_partition.lst");
  TotalOrderPartitioner.setPartitionFile(conf, p);
  conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
  SequenceFile.Writer w = null;
  try {
    w = SequenceFile.createWriter(fs, conf, p,
        splits[0].getClass(), NullWritable.class,
        SequenceFile.CompressionType.NONE);
    for (int i = 0; i < splits.length; ++i) {
      w.append(splits[i], NullWritable.get());
    }
  } finally {
    if (null != w)
      w.close();
  }
  return p;
}
TestJoinDatamerge.java 文件源码 项目:hadoop 阅读 22 收藏 0 点赞 0 评论 0
private static int countProduct(IntWritable key, Path[] src, 
    Configuration conf) throws IOException {
  int product = 1;
  for (Path p : src) {
    int count = 0;
    SequenceFile.Reader r = new SequenceFile.Reader(
      cluster.getFileSystem(), p, conf);
    IntWritable k = new IntWritable();
    IntWritable v = new IntWritable();
    while (r.next(k, v)) {
      if (k.equals(key)) {
        count++;
      }
    }
    r.close();
    if (count != 0) {
      product *= count;
    }
  }
  return product;
}
TestCombineSequenceFileInputFormat.java 文件源码 项目:hadoop 阅读 20 收藏 0 点赞 0 评论 0
private static void createFiles(int length, int numFiles, Random random,
  Job job) throws IOException {
  Range[] ranges = createRanges(length, numFiles, random);

  for (int i = 0; i < numFiles; i++) {
    Path file = new Path(workDir, "test_" + i + ".seq");
    // create a file with length entries
    @SuppressWarnings("deprecation")
    SequenceFile.Writer writer =
      SequenceFile.createWriter(localFs, job.getConfiguration(), file,
                                IntWritable.class, BytesWritable.class);
    Range range = ranges[i];
    try {
      for (int j = range.start; j < range.end; j++) {
        IntWritable key = new IntWritable(j);
        byte[] data = new byte[random.nextInt(10)];
        random.nextBytes(data);
        BytesWritable value = new BytesWritable(data);
        writer.append(key, value);
      }
    } finally {
      writer.close();
    }
  }
}
NNBench.java 文件源码 项目:hadoop 阅读 26 收藏 0 点赞 0 评论 0
/**
 * Create control files before a test run.
 * Number of files created is equal to the number of maps specified
 * 
 * @throws IOException on error
 */
private static void createControlFiles() throws IOException {
  FileSystem tempFS = FileSystem.get(config);
  LOG.info("Creating " + numberOfMaps + " control files");

  for (int i = 0; i < numberOfMaps; i++) {
    String strFileName = "NNBench_Controlfile_" + i;
    Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
            strFileName);

    SequenceFile.Writer writer = null;
    try {
      writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, 
              LongWritable.class, CompressionType.NONE);
      writer.append(new Text(strFileName), new LongWritable(0l));
    } finally {
      if (writer != null) {
        writer.close();
      }
    }
  }
}
UniformSizeInputFormat.java 文件源码 项目:hadoop 阅读 21 收藏 0 点赞 0 评论 0
private SequenceFile.Reader getListingFileReader(Configuration configuration) {

    final Path listingFilePath = getListingFilePath(configuration);
    try {
      final FileSystem fileSystem = listingFilePath.getFileSystem(configuration);
      if (!fileSystem.exists(listingFilePath))
        throw new IllegalArgumentException("Listing file doesn't exist at: "
                                           + listingFilePath);

      return new SequenceFile.Reader(configuration,
                                     SequenceFile.Reader.file(listingFilePath));
    }
    catch (IOException exception) {
      LOG.error("Couldn't find listing file at: " + listingFilePath, exception);
      throw new IllegalArgumentException("Couldn't find listing-file at: "
                                         + listingFilePath, exception);
    }
  }
TotalOrderPartitioner.java 文件源码 项目:hadoop 阅读 17 收藏 0 点赞 0 评论 0
/**
 * Read the cut points from the given IFile.
 * @param fs The file system
 * @param p The path to read
 * @param keyClass The map output key class
 * @param job The job config
 * @throws IOException
 */
                               // matching key types enforced by passing in
@SuppressWarnings("unchecked") // map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
    Configuration conf) throws IOException {
  SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
  ArrayList<K> parts = new ArrayList<K>();
  K key = ReflectionUtils.newInstance(keyClass, conf);
  NullWritable value = NullWritable.get();
  try {
    while (reader.next(key, value)) {
      parts.add(key);
      key = ReflectionUtils.newInstance(keyClass, conf);
    }
    reader.close();
    reader = null;
  } finally {
    IOUtils.cleanup(LOG, reader);
  }
  return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
}
TestClientDistributedCacheManager.java 文件源码 项目:hadoop 阅读 24 收藏 0 点赞 0 评论 0
@SuppressWarnings("deprecation")
void createTempFile(Path p, Configuration conf) throws IOException {
  SequenceFile.Writer writer = null;
  try {
    writer = SequenceFile.createWriter(fs, conf, p,
                                       Text.class, Text.class,
                                       CompressionType.NONE);
    writer.append(new Text("text"), new Text("moretext"));
  } catch(Exception e) {
    throw new IOException(e.getLocalizedMessage());
  } finally {
    if (writer != null) {
      writer.close();
    }
    writer = null;
  }
  LOG.info("created: " + p);
}
SequenceFileAnalyzer.java 文件源码 项目:wherehowsX 阅读 21 收藏 0 点赞 0 评论 0
@Override
public DatasetJsonRecord getSchema(Path path) throws IOException {
    DatasetJsonRecord record = null;
    if (!fs.exists(path))
        LOG.error("sequencefileanalyzer file : " + path.toUri().getPath() + " is not exist on hdfs");
    else {
        try {
            LOG.info("sequencefileanalyzer start parse schema for  file path : {}", path.toUri().getPath());
            SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path));
            String keyName = "Key";
            String keyType = getWritableType(reader.getKeyClassName());
            String valueName = "Value";
            String valueType = getWritableType(reader.getValueClassName());
            FileStatus status = fs.getFileStatus(path);
            String storage = STORAGE_TYPE;
            String abstractPath = path.toUri().getPath();
            String codec = "sequence.codec";
            String schemaString = "{\"fields\": [{\"name\": \"" + keyName + "\", \"type\": \"" + keyType + "\"}, {\"name\": \"" + valueName + "\", \"type\": \"" + valueType + "\"}], \"name\": \"Result\", \"namespace\": \"com.tencent.lake\", \"type\": \"record\"}";

            record = new DatasetJsonRecord(schemaString, abstractPath, status.getModificationTime(), status.getOwner(), status.getGroup(),
                    status.getPermission().toString(), codec, storage, "");
            LOG.info("sequencefileanalyzer parse path :{},schema is {}", path.toUri().getPath(), record.toCsvString());

        } catch (Exception e) {
            LOG.error("path : {} content " + " is not Sequence File format content  ",path.toUri().getPath());
            LOG.info(e.getStackTrace().toString());
        }

    }
    return record;
}
SequenceFileAnalyzer.java 文件源码 项目:wherehowsX 阅读 18 收藏 0 点赞 0 评论 0
@Override
public SampleDataRecord getSampleData(Path path) throws IOException {
    SampleDataRecord dataRecord = null;
    if (!fs.exists(path))
        LOG.error("sequence file : " + path.toUri().getPath() + " is not exist on hdfs");
    else {
        try {
            LOG.info("sequencefileanalyzer start parse sampledata for  file path : {}", path.toUri().getPath());
            SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path));
            List<Object> sampleValues = new ArrayList<Object>();
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
            int count = 0;
            String keyName = "Key";
            String valueName = "Value";
            while (reader.next(key, value) && count < 12) {
                sampleValues.add("{\"" + keyName + "\": \"" + key + "\", \"" + valueName + "\": \"" + value + "\"}");
                count++;
            }
            dataRecord = new SampleDataRecord(path.toUri().getPath(), sampleValues);
            LOG.info("sequence file path : {}, sample data is {}", path.toUri().getPath(), sampleValues);
        } catch (Exception e) {
            LOG.error("path : {} content " + " is not Sequence File format content  ",path.toUri().getPath());
            LOG.info(e.getStackTrace().toString());
        }
    }
    return dataRecord;

}


问题


面经


文章

微信
公众号

扫码关注公众号