def _decompress_bz2(filename, blocksize=900*1024):
"""
Decompress .tar.bz2 to .tar on disk (for faster access)
Use TemporaryFile to guarentee write access.
"""
if not filename.endswith('.tar.bz2'):
return filename
fd, path = mkstemp()
with os.fdopen(fd, 'wb') as fo:
with open(filename, 'rb') as fi:
z = bz2.BZ2Decompressor()
for block in iter(lambda: fi.read(blocksize), b''):
fo.write(z.decompress(block))
return path
python类BZ2Decompressor()的实例源码
def init(self):
import lzma
self.pos = 0
if self.mode == "r":
self.cmpobj = lzma.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = ""
else:
self.cmpobj = lzma.BZ2Compressor()
# class _XZProxy
#------------------------
# Extraction file object
#------------------------
def test_stream_padding(self):
# Test for bug #1543303.
tar = tarfile.open(tmpname, self.mode)
tar.close()
if self.mode.endswith("gz"):
with gzip.GzipFile(tmpname) as fobj:
data = fobj.read()
elif self.mode.endswith("bz2"):
dec = bz2.BZ2Decompressor()
with open(tmpname, "rb") as fobj:
data = fobj.read()
data = dec.decompress(data)
self.assertTrue(len(dec.unused_data) == 0,
"found trailing data")
else:
with open(tmpname, "rb") as fobj:
data = fobj.read()
self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE,
"incorrect zero padding")
def test_stream_padding(self):
# Test for bug #1543303.
tar = tarfile.open(tmpname, self.mode)
tar.close()
if self.mode.endswith("gz"):
with gzip.GzipFile(tmpname) as fobj:
data = fobj.read()
elif self.mode.endswith("bz2"):
dec = bz2.BZ2Decompressor()
with open(tmpname, "rb") as fobj:
data = fobj.read()
data = dec.decompress(data)
self.assertTrue(len(dec.unused_data) == 0,
"found trailing data")
else:
with open(tmpname, "rb") as fobj:
data = fobj.read()
self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
"incorrect zero padding")
def test_stream_padding(self):
# Test for bug #1543303.
tar = tarfile.open(tmpname, self.mode)
tar.close()
if self.mode.endswith("gz"):
with gzip.GzipFile(tmpname) as fobj:
data = fobj.read()
elif self.mode.endswith("bz2"):
dec = bz2.BZ2Decompressor()
with open(tmpname, "rb") as fobj:
data = fobj.read()
data = dec.decompress(data)
self.assertTrue(len(dec.unused_data) == 0,
"found trailing data")
else:
with open(tmpname, "rb") as fobj:
data = fobj.read()
self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
"incorrect zero padding")
def bz2_compress(self,file,type=True): # Compress/Decompress files into/from the bz2 format. compress if type else decompess
if not os.path.exists(file) or os.path.isdir(file): return False
try: filesize = os.path.getsize(file)
except: return False
if not type and not file.endswith(".bz2"): return False
blocksize = 102400
if type: compressor = bz2.BZ2Compressor()
else: decompressor = bz2.BZ2Decompressor()
handle1 = open(file,"rb")
handle2 = open(file+".bz2","wb") if type else open(file[:-4],"wb")
for i in range(int(math.ceil(float(filesize)/blocksize))):
if type: handle2.write(compressor.compress(handle1.read(blocksize)))
else: handle2.write(decompressor.decompress(handle1.read(blocksize)))
if type: handle2.write(compressor.flush())
handle1.close(); handle2.close()
self.debug("Successfully "+("" if type else "de")+"compressed file : "+file)
return True
################################################## Client Behaviour Functions ##################################################
def test_stream_padding(self):
# Test for bug #1543303.
tar = tarfile.open(tmpname, self.mode)
tar.close()
if self.mode.endswith("gz"):
fobj = gzip.GzipFile(tmpname)
data = fobj.read()
fobj.close()
elif self.mode.endswith("bz2"):
dec = bz2.BZ2Decompressor()
with open(tmpname, "rb") as fobj:
data = fobj.read()
data = dec.decompress(data)
self.assertTrue(len(dec.unused_data) == 0,
"found trailing data")
else:
fobj = open(tmpname, "rb")
data = fobj.read()
fobj.close()
self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
"incorrect zero padding")
def test_stream_padding(self):
# Test for bug #1543303.
tar = tarfile.open(tmpname, self.mode)
tar.close()
if self.mode.endswith("gz"):
fobj = gzip.GzipFile(tmpname)
data = fobj.read()
fobj.close()
elif self.mode.endswith("bz2"):
dec = bz2.BZ2Decompressor()
data = open(tmpname, "rb").read()
data = dec.decompress(data)
self.assertTrue(len(dec.unused_data) == 0,
"found trailing data")
else:
fobj = open(tmpname, "rb")
data = fobj.read()
fobj.close()
self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
"incorrect zero padding")
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = ""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def decompress(chunks, compression):
"""Decompress
:param __generator[bytes] chunks: compressed body chunks.
:param str compression: compression constant.
:rtype: __generator[bytes]
:return: decompressed chunks.
:raise: TypeError, DecompressError
"""
if compression not in SUPPORTED_COMPRESSIONS:
raise TypeError('Unsupported compression type: %s' % (compression,))
try:
de_compressor = DECOMPRESSOR_FACTORIES[compression]()
for chunk in chunks:
try:
yield de_compressor.decompress(chunk)
except OSError as err:
# BZ2Decompressor: invalid data stream
raise DecompressError(err) from None
# BZ2Decompressor does not support flush() interface.
if hasattr(de_compressor, 'flush'):
yield de_compressor.flush()
except zlib.error as err:
raise DecompressError(err) from None
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def __init__(self, fileobj):
self._fileobj = fileobj
self._bz2 = bz2.BZ2Decompressor()
self._line_buffer = collections.deque([""])
self._current_line = ""
self._current_offset = 0
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.cmpobj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = ""
else:
self.cmpobj = bz2.BZ2Compressor()
# class _BZ2Proxy
def init(self):
import bz2
self.pos = 0
if self.mode == "r":
self.bz2obj = bz2.BZ2Decompressor()
self.fileobj.seek(0)
self.buf = b""
else:
self.bz2obj = bz2.BZ2Compressor()