def _decompress(self, chunk_size=32000):
"""Decompresses the cluster if compression flag was found. Stores
uncompressed results internally."""
if not self.compressed:
return
self.file_buf.seek(self.ptr + 1)
# Store uncompressed cluster data for use as uncompressed data
self.uncomp_buf = BytesIO()
decomp = lzma.LZMADecompressor()
while not decomp.eof:
comp_data = self.file_buf.read(chunk_size)
uncomp_data = decomp.decompress(comp_data)
self.uncomp_buf.write(uncomp_data)
return self.uncomp_buf
python类LZMADecompressor()的实例源码
def _decompress(self, chunk_size=32000):
"""Decompresses the cluster if compression flag was found. Stores
uncompressed results internally."""
if not self.compressed:
return
self.file_buf.seek(self.ptr + 1)
# Store uncompressed cluster data for use as uncompressed data
self.uncomp_buf = StringIO()
decomp = lzma.LZMADecompressor()
while not decomp.eof:
comp_data = self.file_buf.read(chunk_size)
uncomp_data = decomp.decompress(comp_data)
self.uncomp_buf.write(uncomp_data)
return self.uncomp_buf
def test_roundtrip(self):
cdata = lzma.compress(INPUT)
ddata = lzma.decompress(cdata)
self.assertEqual(ddata, INPUT)
cdata = lzma.compress(INPUT, lzma.FORMAT_XZ)
ddata = lzma.decompress(cdata)
self.assertEqual(ddata, INPUT)
cdata = lzma.compress(INPUT, lzma.FORMAT_ALONE)
ddata = lzma.decompress(cdata)
self.assertEqual(ddata, INPUT)
cdata = lzma.compress(INPUT, lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
ddata = lzma.decompress(cdata, lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
self.assertEqual(ddata, INPUT)
# Unlike LZMADecompressor, decompress() *does* handle concatenated streams.
def test_roundtrip(self):
cdata = lzma.compress(INPUT)
ddata = lzma.decompress(cdata)
self.assertEqual(ddata, INPUT)
cdata = lzma.compress(INPUT, lzma.FORMAT_XZ)
ddata = lzma.decompress(cdata)
self.assertEqual(ddata, INPUT)
cdata = lzma.compress(INPUT, lzma.FORMAT_ALONE)
ddata = lzma.decompress(cdata)
self.assertEqual(ddata, INPUT)
cdata = lzma.compress(INPUT, lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
ddata = lzma.decompress(cdata, lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
self.assertEqual(ddata, INPUT)
# Unlike LZMADecompressor, decompress() *does* handle concatenated streams.
def decompress_lzma(data):
results = []
len(data)
while True:
decomp = LZMADecompressor(FORMAT_AUTO, None, None)
try:
res = decomp.decompress(data)
except LZMAError:
if results:
break # Leftover data is not a valid LZMA/XZ stream; ignore it.
else:
raise # Error on the first iteration; bail out.
results.append(res)
data = decomp.unused_data
if not data:
break
if not decomp.eof:
raise LZMAError("Compressed data ended before the end-of-stream marker was reached")
return b"".join(results)
def test_roundtrip(self):
cdata = lzma.compress(INPUT)
ddata = lzma.decompress(cdata)
self.assertEqual(ddata, INPUT)
cdata = lzma.compress(INPUT, lzma.FORMAT_XZ)
ddata = lzma.decompress(cdata)
self.assertEqual(ddata, INPUT)
cdata = lzma.compress(INPUT, lzma.FORMAT_ALONE)
ddata = lzma.decompress(cdata)
self.assertEqual(ddata, INPUT)
cdata = lzma.compress(INPUT, lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
ddata = lzma.decompress(cdata, lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
self.assertEqual(ddata, INPUT)
# Unlike LZMADecompressor, decompress() *does* handle concatenated streams.
def decompress(self, buf):
if not self.compressed:
return buf
ty = self.compression_type
if ty == CompressionType.LZMA:
props, dict_size = struct.unpack("<BI", buf.read(5))
lc = props % 9
props = int(props / 9)
pb = int(props / 5)
lp = props % 5
dec = lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=[{
"id": lzma.FILTER_LZMA1,
"dict_size": dict_size,
"lc": lc,
"lp": lp,
"pb": pb,
}])
res = dec.decompress(buf.read())
return BytesIO(res)
if ty in (CompressionType.LZ4, CompressionType.LZ4HC):
res = lz4_decompress(buf.read(self.compressed_size), self.uncompressed_size)
return BytesIO(res)
raise NotImplementedError("Unimplemented compression method: %r" % (ty))
def test_simple_bad_args(self):
self.assertRaises(TypeError, LZMACompressor, [])
self.assertRaises(TypeError, LZMACompressor, format=3.45)
self.assertRaises(TypeError, LZMACompressor, check="")
self.assertRaises(TypeError, LZMACompressor, preset="asdf")
self.assertRaises(TypeError, LZMACompressor, filters=3)
# Can't specify FORMAT_AUTO when compressing.
self.assertRaises(ValueError, LZMACompressor, format=lzma.FORMAT_AUTO)
# Can't specify a preset and a custom filter chain at the same time.
with self.assertRaises(ValueError):
LZMACompressor(preset=7, filters=[{"id": lzma.FILTER_LZMA2}])
self.assertRaises(TypeError, LZMADecompressor, ())
self.assertRaises(TypeError, LZMADecompressor, memlimit=b"qw")
with self.assertRaises(TypeError):
LZMADecompressor(lzma.FORMAT_RAW, filters="zzz")
# Cannot specify a memory limit with FILTER_RAW.
with self.assertRaises(ValueError):
LZMADecompressor(lzma.FORMAT_RAW, memlimit=0x1000000)
# Can only specify a custom filter chain with FILTER_RAW.
self.assertRaises(ValueError, LZMADecompressor, filters=FILTERS_RAW_1)
with self.assertRaises(ValueError):
LZMADecompressor(format=lzma.FORMAT_XZ, filters=FILTERS_RAW_1)
with self.assertRaises(ValueError):
LZMADecompressor(format=lzma.FORMAT_ALONE, filters=FILTERS_RAW_1)
lzc = LZMACompressor()
self.assertRaises(TypeError, lzc.compress)
self.assertRaises(TypeError, lzc.compress, b"foo", b"bar")
self.assertRaises(TypeError, lzc.flush, b"blah")
empty = lzc.flush()
self.assertRaises(ValueError, lzc.compress, b"quux")
self.assertRaises(ValueError, lzc.flush)
lzd = LZMADecompressor()
self.assertRaises(TypeError, lzd.decompress)
self.assertRaises(TypeError, lzd.decompress, b"foo", b"bar")
lzd.decompress(empty)
self.assertRaises(EOFError, lzd.decompress, b"quux")
def test_decompressor_after_eof(self):
lzd = LZMADecompressor()
lzd.decompress(COMPRESSED_XZ)
self.assertRaises(EOFError, lzd.decompress, b"nyan")
def test_decompressor_memlimit(self):
lzd = LZMADecompressor(memlimit=1024)
self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)
lzd = LZMADecompressor(lzma.FORMAT_XZ, memlimit=1024)
self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)
lzd = LZMADecompressor(lzma.FORMAT_ALONE, memlimit=1024)
self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE)
# Test LZMADecompressor on known-good input data.
def test_decompressor_auto(self):
lzd = LZMADecompressor()
self._test_decompressor(lzd, COMPRESSED_XZ, lzma.CHECK_CRC64)
lzd = LZMADecompressor()
self._test_decompressor(lzd, COMPRESSED_ALONE, lzma.CHECK_NONE)
def test_decompressor_xz(self):
lzd = LZMADecompressor(lzma.FORMAT_XZ)
self._test_decompressor(lzd, COMPRESSED_XZ, lzma.CHECK_CRC64)
def test_decompressor_raw_1(self):
lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1)
self._test_decompressor(lzd, COMPRESSED_RAW_1, lzma.CHECK_NONE)
def test_decompressor_raw_2(self):
lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_2)
self._test_decompressor(lzd, COMPRESSED_RAW_2, lzma.CHECK_NONE)
def test_decompressor_raw_3(self):
lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_3)
self._test_decompressor(lzd, COMPRESSED_RAW_3, lzma.CHECK_NONE)
def test_decompressor_raw_4(self):
lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
self._test_decompressor(lzd, COMPRESSED_RAW_4, lzma.CHECK_NONE)
def test_decompressor_chunks(self):
lzd = LZMADecompressor()
out = []
for i in range(0, len(COMPRESSED_XZ), 10):
self.assertFalse(lzd.eof)
out.append(lzd.decompress(COMPRESSED_XZ[i:i+10]))
out = b"".join(out)
self.assertEqual(out, INPUT)
self.assertEqual(lzd.check, lzma.CHECK_CRC64)
self.assertTrue(lzd.eof)
self.assertEqual(lzd.unused_data, b"")
def test_decompressor_bad_input(self):
lzd = LZMADecompressor()
self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_RAW_1)
lzd = LZMADecompressor(lzma.FORMAT_XZ)
self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE)
lzd = LZMADecompressor(lzma.FORMAT_ALONE)
self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)
lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1)
self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)
# Test that LZMACompressor->LZMADecompressor preserves the input data.
def test_roundtrip_xz(self):
lzc = LZMACompressor()
cdata = lzc.compress(INPUT) + lzc.flush()
lzd = LZMADecompressor()
self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)
def test_roundtrip_alone(self):
lzc = LZMACompressor(lzma.FORMAT_ALONE)
cdata = lzc.compress(INPUT) + lzc.flush()
lzd = LZMADecompressor()
self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)
def test_roundtrip_raw(self):
lzc = LZMACompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
cdata = lzc.compress(INPUT) + lzc.flush()
lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)
def test_roundtrip_chunks(self):
lzc = LZMACompressor()
cdata = []
for i in range(0, len(INPUT), 10):
cdata.append(lzc.compress(INPUT[i:i+10]))
cdata.append(lzc.flush())
cdata = b"".join(cdata)
lzd = LZMADecompressor()
self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)
# LZMADecompressor intentionally does not handle concatenated streams.
def test_decompressor_bigmem(self, size):
lzd = LZMADecompressor()
blocksize = 10 * 1024 * 1024
block = random.getrandbits(blocksize * 8).to_bytes(blocksize, "little")
try:
input = block * (size // blocksize + 1)
cdata = lzma.compress(input)
ddata = lzd.decompress(cdata)
self.assertEqual(ddata, input)
finally:
input = cdata = ddata = None
def test_decompress_memlimit(self):
with self.assertRaises(LZMAError):
lzma.decompress(COMPRESSED_XZ, memlimit=1024)
with self.assertRaises(LZMAError):
lzma.decompress(
COMPRESSED_XZ, format=lzma.FORMAT_XZ, memlimit=1024)
with self.assertRaises(LZMAError):
lzma.decompress(
COMPRESSED_ALONE, format=lzma.FORMAT_ALONE, memlimit=1024)
# Test LZMADecompressor on known-good input data.
def test_simple_bad_args(self):
self.assertRaises(TypeError, LZMACompressor, [])
self.assertRaises(TypeError, LZMACompressor, format=3.45)
self.assertRaises(TypeError, LZMACompressor, check="")
self.assertRaises(TypeError, LZMACompressor, preset="asdf")
self.assertRaises(TypeError, LZMACompressor, filters=3)
# Can't specify FORMAT_AUTO when compressing.
self.assertRaises(ValueError, LZMACompressor, format=lzma.FORMAT_AUTO)
# Can't specify a preset and a custom filter chain at the same time.
with self.assertRaises(ValueError):
LZMACompressor(preset=7, filters=[{"id": lzma.FILTER_LZMA2}])
self.assertRaises(TypeError, LZMADecompressor, ())
self.assertRaises(TypeError, LZMADecompressor, memlimit=b"qw")
with self.assertRaises(TypeError):
LZMADecompressor(lzma.FORMAT_RAW, filters="zzz")
# Cannot specify a memory limit with FILTER_RAW.
with self.assertRaises(ValueError):
LZMADecompressor(lzma.FORMAT_RAW, memlimit=0x1000000)
# Can only specify a custom filter chain with FILTER_RAW.
self.assertRaises(ValueError, LZMADecompressor, filters=FILTERS_RAW_1)
with self.assertRaises(ValueError):
LZMADecompressor(format=lzma.FORMAT_XZ, filters=FILTERS_RAW_1)
with self.assertRaises(ValueError):
LZMADecompressor(format=lzma.FORMAT_ALONE, filters=FILTERS_RAW_1)
lzc = LZMACompressor()
self.assertRaises(TypeError, lzc.compress)
self.assertRaises(TypeError, lzc.compress, b"foo", b"bar")
self.assertRaises(TypeError, lzc.flush, b"blah")
empty = lzc.flush()
self.assertRaises(ValueError, lzc.compress, b"quux")
self.assertRaises(ValueError, lzc.flush)
lzd = LZMADecompressor()
self.assertRaises(TypeError, lzd.decompress)
self.assertRaises(TypeError, lzd.decompress, b"foo", b"bar")
lzd.decompress(empty)
self.assertRaises(EOFError, lzd.decompress, b"quux")
def test_decompressor_after_eof(self):
lzd = LZMADecompressor()
lzd.decompress(COMPRESSED_XZ)
self.assertRaises(EOFError, lzd.decompress, b"nyan")
def test_decompressor_memlimit(self):
lzd = LZMADecompressor(memlimit=1024)
self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)
lzd = LZMADecompressor(lzma.FORMAT_XZ, memlimit=1024)
self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)
lzd = LZMADecompressor(lzma.FORMAT_ALONE, memlimit=1024)
self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE)
# Test LZMADecompressor on known-good input data.
def test_decompressor_auto(self):
lzd = LZMADecompressor()
self._test_decompressor(lzd, COMPRESSED_XZ, lzma.CHECK_CRC64)
lzd = LZMADecompressor()
self._test_decompressor(lzd, COMPRESSED_ALONE, lzma.CHECK_NONE)
def test_decompressor_xz(self):
lzd = LZMADecompressor(lzma.FORMAT_XZ)
self._test_decompressor(lzd, COMPRESSED_XZ, lzma.CHECK_CRC64)
def test_decompressor_raw_1(self):
lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1)
self._test_decompressor(lzd, COMPRESSED_RAW_1, lzma.CHECK_NONE)