public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
java类org.apache.hadoop.io.compress.Decompressor的实例源码
Compression.java 文件源码
项目:hadoop-oss
阅读 25
收藏 0
点赞 0
评论 0
TestZlibCompressorDecompressor.java 文件源码
项目:hadoop-oss
阅读 34
收藏 0
点赞 0
评论 0
@Test
public void testZlibCompressorDecompressorWithConfiguration() {
Configuration conf = new Configuration();
if (ZlibFactory.isNativeZlibLoaded(conf)) {
byte[] rawData;
int tryNumber = 5;
int BYTE_SIZE = 10 * 1024;
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
rawData = generate(BYTE_SIZE);
try {
for (int i = 0; i < tryNumber; i++)
compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor,
(ZlibDecompressor) zlibDecompressor);
zlibCompressor.reinit(conf);
} catch (Exception ex) {
fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex);
}
} else {
assertTrue("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
}
}
TestZlibCompressorDecompressor.java 文件源码
项目:hadoop-oss
阅读 28
收藏 0
点赞 0
评论 0
@Test
public void testZlibCompressorDecompressorWithCompressionLevels() {
Configuration conf = new Configuration();
conf.set("zlib.compress.level","FOUR");
if (ZlibFactory.isNativeZlibLoaded(conf)) {
byte[] rawData;
int tryNumber = 5;
int BYTE_SIZE = 10 * 1024;
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
rawData = generate(BYTE_SIZE);
try {
for (int i = 0; i < tryNumber; i++)
compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor,
(ZlibDecompressor) zlibDecompressor);
zlibCompressor.reinit(conf);
} catch (Exception ex) {
fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex);
}
} else {
assertTrue("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
}
}
TestZlibCompressorDecompressor.java 文件源码
项目:hadoop-oss
阅读 30
收藏 0
点赞 0
评论 0
@Test
public void testZlibCompressorDecompressorSetDictionary() {
Configuration conf = new Configuration();
if (ZlibFactory.isNativeZlibLoaded(conf)) {
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
checkSetDictionaryNullPointerException(zlibCompressor);
checkSetDictionaryNullPointerException(zlibDecompressor);
checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor);
checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor);
} else {
assertTrue("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
}
}
Compression.java 文件源码
项目:hadoop
阅读 29
收藏 0
点赞 0
评论 0
public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
TestZlibCompressorDecompressor.java 文件源码
项目:hadoop
阅读 27
收藏 0
点赞 0
评论 0
@Test
public void testZlibCompressorDecompressorWithConfiguration() {
Configuration conf = new Configuration();
conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (ZlibFactory.isNativeZlibLoaded(conf)) {
byte[] rawData;
int tryNumber = 5;
int BYTE_SIZE = 10 * 1024;
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
rawData = generate(BYTE_SIZE);
try {
for (int i = 0; i < tryNumber; i++)
compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor,
(ZlibDecompressor) zlibDecompressor);
zlibCompressor.reinit(conf);
} catch (Exception ex) {
fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex);
}
} else {
assertTrue("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
}
}
TestZlibCompressorDecompressor.java 文件源码
项目:hadoop
阅读 34
收藏 0
点赞 0
评论 0
@Test
public void testZlibCompressorDecompressorSetDictionary() {
Configuration conf = new Configuration();
conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (ZlibFactory.isNativeZlibLoaded(conf)) {
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
checkSetDictionaryNullPointerException(zlibCompressor);
checkSetDictionaryNullPointerException(zlibDecompressor);
checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor);
checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor);
} else {
assertTrue("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
}
}
Compression.java 文件源码
项目:ditb
阅读 23
收藏 0
点赞 0
评论 0
public Decompressor getDecompressor() {
CompressionCodec codec = getCodec(conf);
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor
+ " from pool.");
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using it.
LOG.warn("Deompressor obtained from CodecPool is already finished()");
}
decompressor.reset();
}
return decompressor;
}
return null;
}
Compression.java 文件源码
项目:ditb
阅读 20
收藏 0
点赞 0
评论 0
/**
* Decompresses data from the given stream using the configured compression algorithm. It will
* throw an exception if the dest buffer does not have enough space to hold the decompressed data.
* @param dest the output bytes buffer
* @param destOffset start writing position of the output buffer
* @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact amount
* of compressed data
* @param compressedSize compressed data size, header not included
* @param uncompressedSize uncompressed data size, header not included
* @param compressAlgo compression algorithm used
* @throws IOException
*/
public static void decompress(byte[] dest, int destOffset, InputStream bufferedBoundedStream,
int compressedSize, int uncompressedSize, Compression.Algorithm compressAlgo)
throws IOException {
if (dest.length - destOffset < uncompressedSize) {
throw new IllegalArgumentException("Output buffer does not have enough space to hold "
+ uncompressedSize + " decompressed bytes, available: " + (dest.length - destOffset));
}
Decompressor decompressor = null;
try {
decompressor = compressAlgo.getDecompressor();
InputStream is =
compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0);
IOUtils.readFully(is, dest, destOffset, uncompressedSize);
is.close();
} finally {
if (decompressor != null) {
compressAlgo.returnDecompressor(decompressor);
}
}
}
Compression.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 27
收藏 0
点赞 0
评论 0
@Override
public synchronized InputStream createDecompressionStream(
InputStream downStream, Decompressor decompressor,
int downStreamBufferSize) throws IOException {
if (!isSupported()) {
throw new IOException(
"LZO codec class not specified. Did you forget to set property "
+ CONF_LZO_CLASS + "?");
}
InputStream bis1 = null;
if (downStreamBufferSize > 0) {
bis1 = new BufferedInputStream(downStream, downStreamBufferSize);
} else {
bis1 = downStream;
}
conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
CompressionInputStream cis =
codec.createInputStream(bis1, decompressor);
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
return bis2;
}
Compression.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 24
收藏 0
点赞 0
评论 0
public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
TestZlibCompressorDecompressor.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 27
收藏 0
点赞 0
评论 0
@Test
public void testZlibCompressorDecompressorWithConfiguration() {
Configuration conf = new Configuration();
if (ZlibFactory.isNativeZlibLoaded(conf)) {
byte[] rawData;
int tryNumber = 5;
int BYTE_SIZE = 10 * 1024;
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
rawData = generate(BYTE_SIZE);
try {
for (int i = 0; i < tryNumber; i++)
compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor,
(ZlibDecompressor) zlibDecompressor);
zlibCompressor.reinit(conf);
} catch (Exception ex) {
fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex);
}
} else {
assertTrue("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
}
}
TestZlibCompressorDecompressor.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 28
收藏 0
点赞 0
评论 0
@Test
public void testZlibCompressorDecompressorSetDictionary() {
Configuration conf = new Configuration();
if (ZlibFactory.isNativeZlibLoaded(conf)) {
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
checkSetDictionaryNullPointerException(zlibCompressor);
checkSetDictionaryNullPointerException(zlibDecompressor);
checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor);
checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor);
} else {
assertTrue("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
}
}
CompressionRollBackHelper.java 文件源码
项目:bigstreams
阅读 31
收藏 0
点赞 0
评论 0
/**
*
* @param source
* @param dest
* @param codec
* @param compressor
* may be null
* @param decomp
* may be null
* @param mark
* @return
* @throws IOException
*/
public static final CompressionOutputStream copy(File source, File dest,
CompressionCodec codec, Compressor compressor, Decompressor decomp,
long mark) throws IOException {
FileInputStream fileInput = new FileInputStream(source);
CompressionInputStream in = (decomp == null) ? codec
.createInputStream(fileInput) : codec.createInputStream(
fileInput, decomp);
FileOutputStream fileOut = new FileOutputStream(dest);
CompressionOutputStream out = (compressor == null) ? codec
.createOutputStream(fileOut) : codec.createOutputStream(
fileOut, compressor);
try {
copy(in, out, mark);
return out;
} finally {
IOUtils.closeQuietly(in);
IOUtils.closeQuietly(fileInput);
}
}
Compression.java 文件源码
项目:big-c
阅读 28
收藏 0
点赞 0
评论 0
@Override
public synchronized InputStream createDecompressionStream(
InputStream downStream, Decompressor decompressor,
int downStreamBufferSize) throws IOException {
if (!isSupported()) {
throw new IOException(
"LZO codec class not specified. Did you forget to set property "
+ CONF_LZO_CLASS + "?");
}
InputStream bis1 = null;
if (downStreamBufferSize > 0) {
bis1 = new BufferedInputStream(downStream, downStreamBufferSize);
} else {
bis1 = downStream;
}
conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
CompressionInputStream cis =
codec.createInputStream(bis1, decompressor);
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
return bis2;
}
Compression.java 文件源码
项目:big-c
阅读 28
收藏 0
点赞 0
评论 0
public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
TestZlibCompressorDecompressor.java 文件源码
项目:big-c
阅读 30
收藏 0
点赞 0
评论 0
@Test
public void testZlibCompressorDecompressorWithConfiguration() {
Configuration conf = new Configuration();
conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (ZlibFactory.isNativeZlibLoaded(conf)) {
byte[] rawData;
int tryNumber = 5;
int BYTE_SIZE = 10 * 1024;
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
rawData = generate(BYTE_SIZE);
try {
for (int i = 0; i < tryNumber; i++)
compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor,
(ZlibDecompressor) zlibDecompressor);
zlibCompressor.reinit(conf);
} catch (Exception ex) {
fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex);
}
} else {
assertTrue("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
}
}
TestZlibCompressorDecompressor.java 文件源码
项目:big-c
阅读 26
收藏 0
点赞 0
评论 0
@Test
public void testZlibCompressorDecompressorSetDictionary() {
Configuration conf = new Configuration();
conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (ZlibFactory.isNativeZlibLoaded(conf)) {
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
checkSetDictionaryNullPointerException(zlibCompressor);
checkSetDictionaryNullPointerException(zlibDecompressor);
checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor);
checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor);
} else {
assertTrue("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
}
}
MapReduceExcelOutputIntegrationTest.java 文件源码
项目:hadoopoffice
阅读 25
收藏 0
点赞 0
评论 0
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
MapReduceExcelInputIntegrationTest.java 文件源码
项目:hadoopoffice
阅读 25
收藏 0
点赞 0
评论 0
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
HadoopFileReader.java 文件源码
项目:hadoopoffice
阅读 30
收藏 0
点赞 0
评论 0
public InputStream openFile(Path path) throws IOException {
CompressionCodec codec=compressionCodecs.getCodec(path);
FSDataInputStream fileIn=fs.open(path);
// check if compressed
if (codec==null) { // uncompressed
LOG.debug("Reading from an uncompressed file \""+path+"\"");
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
LOG.debug("Reading from a compressed file \""+path+"\" with splittable compression codec");
long end = fs.getFileStatus(path).getLen();
return ((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
} else {
LOG.debug("Reading from a compressed file \""+path+"\" with non-splittable compression codec");
return codec.createInputStream(fileIn,decompressor);
}
}
}
SparkBitcoinBlockCounterSparkMasterIntegrationTest.java 文件源码
项目:hadoopcryptoledger
阅读 22
收藏 0
点赞 0
评论 0
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(conf).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
MapReduceBitcoinTransactionIntegrationTest.java 文件源码
项目:hadoopcryptoledger
阅读 28
收藏 0
点赞 0
评论 0
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
MapReduceEthereumBlockIntegrationTest.java 文件源码
项目:hadoopcryptoledger
阅读 24
收藏 0
点赞 0
评论 0
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
Spark2BitcoinBlockCounterSparkMasterIntegrationTest.java 文件源码
项目:hadoopcryptoledger
阅读 27
收藏 0
点赞 0
评论 0
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(conf).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
MapReduceBitcoinBlockIntegrationTest.java 文件源码
项目:hadoopcryptoledger
阅读 19
收藏 0
点赞 0
评论 0
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
HadoopUtils.java 文件源码
项目:incubator-hivemall
阅读 24
收藏 0
点赞 0
评论 0
public static BufferedReader getBufferedReader(File file, MapredContext context)
throws IOException {
URI fileuri = file.toURI();
Path path = new Path(fileuri);
Configuration conf = context.getJobConf();
CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
CompressionCodec codec = ccf.getCodec(path);
if (codec == null) {
return new BufferedReader(new FileReader(file));
} else {
Decompressor decompressor = CodecPool.getDecompressor(codec);
FileInputStream fis = new FileInputStream(file);
CompressionInputStream cis = codec.createInputStream(fis, decompressor);
BufferedReader br = new BufferedReaderExt(new InputStreamReader(cis), decompressor);
return br;
}
}
Compression.java 文件源码
项目:LCIndex-HBase-0.94.16
阅读 25
收藏 0
点赞 0
评论 0
public Decompressor getDecompressor() {
CompressionCodec codec = getCodec(conf);
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG
.warn("Deompressor obtained from CodecPool is already finished()");
// throw new AssertionError(
// "Decompressor obtained from CodecPool is already finished()");
}
decompressor.reset();
}
return decompressor;
}
return null;
}
HFileBlock.java 文件源码
项目:LCIndex-HBase-0.94.16
阅读 21
收藏 0
点赞 0
评论 0
/**
* Decompresses data from the given stream using the configured compression algorithm.
* @param dest
* @param destOffset
* @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact
* amount of compressed data
* @param uncompressedSize uncompressed data size, header not included
* @throws IOException
*/
protected void decompress(byte[] dest, int destOffset, InputStream bufferedBoundedStream,
int uncompressedSize) throws IOException {
Decompressor decompressor = null;
try {
decompressor = compressAlgo.getDecompressor();
InputStream is =
compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0);
IOUtils.readFully(is, dest, destOffset, uncompressedSize);
is.close();
} finally {
if (decompressor != null) {
compressAlgo.returnDecompressor(decompressor);
}
}
}
Compression.java 文件源码
项目:hadoop-2.6.0-cdh5.4.3
阅读 27
收藏 0
点赞 0
评论 0
@Override
public synchronized InputStream createDecompressionStream(
InputStream downStream, Decompressor decompressor,
int downStreamBufferSize) throws IOException {
if (!isSupported()) {
throw new IOException(
"LZO codec class not specified. Did you forget to set property "
+ CONF_LZO_CLASS + "?");
}
InputStream bis1 = null;
if (downStreamBufferSize > 0) {
bis1 = new BufferedInputStream(downStream, downStreamBufferSize);
} else {
bis1 = downStream;
}
conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
CompressionInputStream cis =
codec.createInputStream(bis1, decompressor);
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
return bis2;
}