/** Close the file. */
@Override
public synchronized void close() throws IOException {
keySerializer.close();
uncompressedValSerializer.close();
if (compressedValSerializer != null) {
compressedValSerializer.close();
}
CodecPool.returnCompressor(compressor);
compressor = null;
if (out != null) {
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
} else {
out.flush();
}
out = null;
}
}
java类org.apache.hadoop.io.compress.CodecPool的实例源码
SequenceFile.java 文件源码
项目:hadoop-oss
阅读 24
收藏 0
点赞 0
评论 0
SequenceFile.java 文件源码
项目:hadoop-oss
阅读 27
收藏 0
点赞 0
评论 0
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}
Compression.java 文件源码
项目:hadoop-oss
阅读 27
收藏 0
点赞 0
评论 0
public Compressor getCompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (compressor != null) {
if (compressor.finished()) {
// Somebody returns the compressor to CodecPool but is still using
// it.
LOG.warn("Compressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a compressor: " + compressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* compressor is referenced after returned back to the codec pool.
*/
compressor.reset();
}
return compressor;
}
return null;
}
Compression.java 文件源码
项目:hadoop-oss
阅读 29
收藏 0
点赞 0
评论 0
public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
HDFSCompressedDataStream.java 文件源码
项目:flume-release-1.7.0
阅读 28
收藏 0
点赞 0
评论 0
@Override
public void close() throws IOException {
serializer.flush();
serializer.beforeClose();
if (!isFinished) {
cmpOut.finish();
isFinished = true;
}
fsOut.flush();
hflushOrSync(fsOut);
cmpOut.close();
if (compressor != null) {
CodecPool.returnCompressor(compressor);
compressor = null;
}
unregisterCurrentStream();
}
IFile.java 文件源码
项目:hadoop
阅读 23
收藏 0
点赞 0
评论 0
/**
* Construct an IFile Reader.
*
* @param conf Configuration File
* @param in The input stream
* @param length Length of the data in the stream, including the checksum
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(Configuration conf, FSDataInputStream in, long length,
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
}
} else {
this.in = checksumIn;
}
this.dataIn = new DataInputStream(this.in);
this.fileLength = length;
if (conf != null) {
bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
}
}
IFile.java 文件源码
项目:hadoop
阅读 27
收藏 0
点赞 0
评论 0
public void close() throws IOException {
// Close the underlying stream
in.close();
// Release the buffer
dataIn = null;
buffer = null;
if(readRecordsCounter != null) {
readRecordsCounter.increment(numRecordsRead);
}
// Return the decompressor
if (decompressor != null) {
decompressor.reset();
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
}
InMemoryMapOutput.java 文件源码
项目:hadoop
阅读 26
收藏 0
点赞 0
评论 0
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
MergeManagerImpl<K, V> merger,
int size, CompressionCodec codec,
boolean primaryMapOutput) {
super(mapId, (long)size, primaryMapOutput);
this.conf = conf;
this.merger = merger;
this.codec = codec;
byteStream = new BoundedByteArrayOutputStream(size);
memory = byteStream.getBuffer();
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
} else {
decompressor = null;
}
}
PossiblyDecompressedInputStream.java 文件源码
项目:hadoop
阅读 21
收藏 0
点赞 0
评论 0
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
throws IOException {
CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec inputCodec = codecs.getCodec(inputPath);
FileSystem ifs = inputPath.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(inputPath);
if (inputCodec == null) {
decompressor = null;
coreInputStream = fileIn;
} else {
decompressor = CodecPool.getDecompressor(inputCodec);
coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
}
}
Anonymizer.java 文件源码
项目:hadoop
阅读 33
收藏 0
点赞 0
评论 0
private JsonGenerator createJsonGenerator(Configuration conf, Path path)
throws IOException {
FileSystem outFS = path.getFileSystem(conf);
CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(path);
OutputStream output;
Compressor compressor = null;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(outFS.create(path), compressor);
} else {
output = outFS.create(path);
}
JsonGenerator outGen = outFactory.createJsonGenerator(output,
JsonEncoding.UTF8);
outGen.useDefaultPrettyPrinter();
return outGen;
}
SequenceFile.java 文件源码
项目:hadoop
阅读 35
收藏 0
点赞 0
评论 0
/** Close the file. */
@Override
public synchronized void close() throws IOException {
keySerializer.close();
uncompressedValSerializer.close();
if (compressedValSerializer != null) {
compressedValSerializer.close();
}
CodecPool.returnCompressor(compressor);
compressor = null;
if (out != null) {
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
} else {
out.flush();
}
out = null;
}
}
SequenceFile.java 文件源码
项目:hadoop
阅读 25
收藏 0
点赞 0
评论 0
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}
Compression.java 文件源码
项目:hadoop
阅读 30
收藏 0
点赞 0
评论 0
public Compressor getCompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (compressor != null) {
if (compressor.finished()) {
// Somebody returns the compressor to CodecPool but is still using
// it.
LOG.warn("Compressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a compressor: " + compressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* compressor is referenced after returned back to the codec pool.
*/
compressor.reset();
}
return compressor;
}
return null;
}
Compression.java 文件源码
项目:hadoop
阅读 26
收藏 0
点赞 0
评论 0
public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
Compression.java 文件源码
项目:ditb
阅读 26
收藏 0
点赞 0
评论 0
public Compressor getCompressor() {
CompressionCodec codec = getCodec(conf);
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
if (compressor != null) {
if (compressor.finished()) {
// Somebody returns the compressor to CodecPool but is still using it.
LOG.warn("Compressor obtained from CodecPool is already finished()");
}
compressor.reset();
}
return compressor;
}
return null;
}
Compression.java 文件源码
项目:ditb
阅读 35
收藏 0
点赞 0
评论 0
public Decompressor getDecompressor() {
CompressionCodec codec = getCodec(conf);
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor
+ " from pool.");
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using it.
LOG.warn("Deompressor obtained from CodecPool is already finished()");
}
decompressor.reset();
}
return decompressor;
}
return null;
}
IFile.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 33
收藏 0
点赞 0
评论 0
/**
* Construct an IFile Reader.
*
* @param conf Configuration File
* @param in The input stream
* @param length Length of the data in the stream, including the checksum
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(Configuration conf, FSDataInputStream in, long length,
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
}
} else {
this.in = checksumIn;
}
this.dataIn = new DataInputStream(this.in);
this.fileLength = length;
if (conf != null) {
bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
}
}
IFile.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 29
收藏 0
点赞 0
评论 0
public void close() throws IOException {
// Close the underlying stream
in.close();
// Release the buffer
dataIn = null;
buffer = null;
if(readRecordsCounter != null) {
readRecordsCounter.increment(numRecordsRead);
}
// Return the decompressor
if (decompressor != null) {
decompressor.reset();
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
}
PossiblyDecompressedInputStream.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 24
收藏 0
点赞 0
评论 0
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
throws IOException {
CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec inputCodec = codecs.getCodec(inputPath);
FileSystem ifs = inputPath.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(inputPath);
if (inputCodec == null) {
decompressor = null;
coreInputStream = fileIn;
} else {
decompressor = CodecPool.getDecompressor(inputCodec);
coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
}
}
Anonymizer.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 30
收藏 0
点赞 0
评论 0
private JsonGenerator createJsonGenerator(Configuration conf, Path path)
throws IOException {
FileSystem outFS = path.getFileSystem(conf);
CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(path);
OutputStream output;
Compressor compressor = null;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(outFS.create(path), compressor);
} else {
output = outFS.create(path);
}
JsonGenerator outGen = outFactory.createJsonGenerator(output,
JsonEncoding.UTF8);
outGen.useDefaultPrettyPrinter();
return outGen;
}
SequenceFile.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 24
收藏 0
点赞 0
评论 0
/** Close the file. */
@Override
public synchronized void close() throws IOException {
keySerializer.close();
uncompressedValSerializer.close();
if (compressedValSerializer != null) {
compressedValSerializer.close();
}
CodecPool.returnCompressor(compressor);
compressor = null;
if (out != null) {
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
} else {
out.flush();
}
out = null;
}
}
SequenceFile.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 26
收藏 0
点赞 0
评论 0
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}
Compression.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 25
收藏 0
点赞 0
评论 0
public Compressor getCompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (compressor != null) {
if (compressor.finished()) {
// Somebody returns the compressor to CodecPool but is still using
// it.
LOG.warn("Compressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a compressor: " + compressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* compressor is referenced after returned back to the codec pool.
*/
compressor.reset();
}
return compressor;
}
return null;
}
Compression.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 22
收藏 0
点赞 0
评论 0
public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
SequenceFile.java 文件源码
项目:gemfirexd-oss
阅读 28
收藏 0
点赞 0
评论 0
/** Close the file. */
@Override
public synchronized void close() throws IOException {
keySerializer.close();
uncompressedValSerializer.close();
if (compressedValSerializer != null) {
compressedValSerializer.close();
}
CodecPool.returnCompressor(compressor);
compressor = null;
if (out != null) {
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
} else {
out.flush();
}
out = null;
}
}
SequenceFile.java 文件源码
项目:gemfirexd-oss
阅读 33
收藏 0
点赞 0
评论 0
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}
IFile.java 文件源码
项目:big-c
阅读 24
收藏 0
点赞 0
评论 0
/**
* Construct an IFile Reader.
*
* @param conf Configuration File
* @param in The input stream
* @param length Length of the data in the stream, including the checksum
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(Configuration conf, FSDataInputStream in, long length,
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
}
} else {
this.in = checksumIn;
}
this.dataIn = new DataInputStream(this.in);
this.fileLength = length;
if (conf != null) {
bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
}
}
IFile.java 文件源码
项目:big-c
阅读 27
收藏 0
点赞 0
评论 0
public void close() throws IOException {
// Close the underlying stream
in.close();
// Release the buffer
dataIn = null;
buffer = null;
if(readRecordsCounter != null) {
readRecordsCounter.increment(numRecordsRead);
}
// Return the decompressor
if (decompressor != null) {
decompressor.reset();
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
}
InMemoryMapOutput.java 文件源码
项目:big-c
阅读 26
收藏 0
点赞 0
评论 0
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
MergeManagerImpl<K, V> merger,
int size, CompressionCodec codec,
boolean primaryMapOutput) {
super(mapId, (long)size, primaryMapOutput);
this.conf = conf;
this.merger = merger;
this.codec = codec;
byteStream = new BoundedByteArrayOutputStream(size);
memory = byteStream.getBuffer();
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
} else {
decompressor = null;
}
}
PossiblyDecompressedInputStream.java 文件源码
项目:big-c
阅读 33
收藏 0
点赞 0
评论 0
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
throws IOException {
CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec inputCodec = codecs.getCodec(inputPath);
FileSystem ifs = inputPath.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(inputPath);
if (inputCodec == null) {
decompressor = null;
coreInputStream = fileIn;
} else {
decompressor = CodecPool.getDecompressor(inputCodec);
coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
}
}