java类org.apache.hadoop.io.compress.CodecPool的实例源码

Anonymizer.java 文件源码 项目:big-c 阅读 35 收藏 0 点赞 0 评论 0
private JsonGenerator createJsonGenerator(Configuration conf, Path path) 
throws IOException {
  FileSystem outFS = path.getFileSystem(conf);
  CompressionCodec codec =
    new CompressionCodecFactory(conf).getCodec(path);
  OutputStream output;
  Compressor compressor = null;
  if (codec != null) {
    compressor = CodecPool.getCompressor(codec);
    output = codec.createOutputStream(outFS.create(path), compressor);
  } else {
    output = outFS.create(path);
  }

  JsonGenerator outGen = outFactory.createJsonGenerator(output, 
                                                        JsonEncoding.UTF8);
  outGen.useDefaultPrettyPrinter();

  return outGen;
}
SequenceFile.java 文件源码 项目:big-c 阅读 25 收藏 0 点赞 0 评论 0
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  keySerializer.close();
  uncompressedValSerializer.close();
  if (compressedValSerializer != null) {
    compressedValSerializer.close();
  }

  CodecPool.returnCompressor(compressor);
  compressor = null;

  if (out != null) {

    // Close the underlying stream iff we own it...
    if (ownOutputStream) {
      out.close();
    } else {
      out.flush();
    }
    out = null;
  }
}
SequenceFile.java 文件源码 项目:big-c 阅读 26 收藏 0 点赞 0 评论 0
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  // Return the decompressors to the pool
  CodecPool.returnDecompressor(keyLenDecompressor);
  CodecPool.returnDecompressor(keyDecompressor);
  CodecPool.returnDecompressor(valLenDecompressor);
  CodecPool.returnDecompressor(valDecompressor);
  keyLenDecompressor = keyDecompressor = null;
  valLenDecompressor = valDecompressor = null;

  if (keyDeserializer != null) {
    keyDeserializer.close();
  }
  if (valDeserializer != null) {
    valDeserializer.close();
  }

  // Close the input-stream
  in.close();
}
Compression.java 文件源码 项目:big-c 阅读 48 收藏 0 点赞 0 评论 0
public Compressor getCompressor() throws IOException {
  CompressionCodec codec = getCodec();
  if (codec != null) {
    Compressor compressor = CodecPool.getCompressor(codec);
    if (compressor != null) {
      if (compressor.finished()) {
        // Somebody returns the compressor to CodecPool but is still using
        // it.
        LOG.warn("Compressor obtained from CodecPool already finished()");
      } else {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Got a compressor: " + compressor.hashCode());
        }
      }
      /**
       * Following statement is necessary to get around bugs in 0.18 where a
       * compressor is referenced after returned back to the codec pool.
       */
      compressor.reset();
    }
    return compressor;
  }
  return null;
}
Compression.java 文件源码 项目:big-c 阅读 26 收藏 0 点赞 0 评论 0
public Decompressor getDecompressor() throws IOException {
  CompressionCodec codec = getCodec();
  if (codec != null) {
    Decompressor decompressor = CodecPool.getDecompressor(codec);
    if (decompressor != null) {
      if (decompressor.finished()) {
        // Somebody returns the decompressor to CodecPool but is still using
        // it.
        LOG.warn("Deompressor obtained from CodecPool already finished()");
      } else {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Got a decompressor: " + decompressor.hashCode());
        }
      }
      /**
       * Following statement is necessary to get around bugs in 0.18 where a
       * decompressor is referenced after returned back to the codec pool.
       */
      decompressor.reset();
    }
    return decompressor;
  }

  return null;
}
MapReduceExcelOutputIntegrationTest.java 文件源码 项目:hadoopoffice 阅读 23 收藏 0 点赞 0 评论 0
private InputStream openFile(Path path) throws IOException {
        CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
    FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
    // check if compressed
    if (codec==null) { // uncompressed
        return fileIn;
    } else { // compressed
        Decompressor decompressor = CodecPool.getDecompressor(codec);
        this.openDecompressors.add(decompressor); // to be returned later using close
        if (codec instanceof SplittableCompressionCodec) {
            long end = dfsCluster.getFileSystem().getFileStatus(path).getLen(); 
                final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
                    return cIn;
            } else {
                return codec.createInputStream(fileIn,decompressor);
            }
    }
}
MapReduceExcelInputIntegrationTest.java 文件源码 项目:hadoopoffice 阅读 29 收藏 0 点赞 0 评论 0
private InputStream openFile(Path path) throws IOException {
        CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
    FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
    // check if compressed
    if (codec==null) { // uncompressed
        return fileIn;
    } else { // compressed
        Decompressor decompressor = CodecPool.getDecompressor(codec);
        this.openDecompressors.add(decompressor); // to be returned later using close
        if (codec instanceof SplittableCompressionCodec) {
            long end = dfsCluster.getFileSystem().getFileStatus(path).getLen(); 
                final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
                    return cIn;
            } else {
                return codec.createInputStream(fileIn,decompressor);
            }
    }
}
AbstractSpreadSheetDocumentRecordReader.java 文件源码 项目:hadoopoffice 阅读 22 收藏 0 点赞 0 评论 0
@Override
public synchronized void  close() throws IOException {
try {
    if (officeReader!=null) {
    officeReader.close();
     }
    } finally {
      if (decompressor != null) { // return this decompressor
        CodecPool.returnDecompressor(decompressor);
        decompressor = null;
      } // return decompressor of linked workbooks
    if (this.currentHFR!=null) {
        currentHFR.close();
    }
    }
    // do not close the filesystem! will cause exceptions in Spark
}
AbstractSpreadSheetDocumentRecordReader.java 文件源码 项目:hadoopoffice 阅读 32 收藏 0 点赞 0 评论 0
@Override
public synchronized void  close() throws IOException {
try {
    if (officeReader!=null) {
    officeReader.close();
     }
    } finally {
      if (decompressor != null) { // return this decompressor
        CodecPool.returnDecompressor(decompressor);
        decompressor = null;
      } // return decompressor of linked workbooks
    if (this.currentHFR!=null) {
        currentHFR.close();
    }
    }
    // do not close the filesystem! will cause exceptions in Spark

}
HadoopFileReader.java 文件源码 项目:hadoopoffice 阅读 21 收藏 0 点赞 0 评论 0
public InputStream openFile(Path path) throws IOException {
        CompressionCodec codec=compressionCodecs.getCodec(path);
    FSDataInputStream fileIn=fs.open(path);
    // check if compressed
    if (codec==null) { // uncompressed
    LOG.debug("Reading from an uncompressed file \""+path+"\"");
        return fileIn;
    } else { // compressed
        Decompressor decompressor = CodecPool.getDecompressor(codec);
        this.openDecompressors.add(decompressor); // to be returned later using close
        if (codec instanceof SplittableCompressionCodec) {
            LOG.debug("Reading from a compressed file \""+path+"\" with splittable compression codec");
            long end = fs.getFileStatus(path).getLen(); 
                return ((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
            } else {
            LOG.debug("Reading from a compressed file \""+path+"\" with non-splittable compression codec");
                return codec.createInputStream(fileIn,decompressor);
            }
    }
}


问题


面经


文章

微信
公众号

扫码关注公众号