def _create_compressor(self):
return zlib.compressobj(-1, zlib.DEFLATED, -self._max_wbits)
python类DEFLATED的实例源码
def __init__(self, im):
DefinitionTag.__init__(self)
self.tagtype = 36 # DefineBitsLossless2
# convert image (note that format is ARGB)
# even a grayscale image is stored in ARGB, nevertheless,
# the fabilous deflate compression will make it that not much
# more data is required for storing (25% or so, and less than 10%
# when storing RGB as ARGB).
if len(im.shape) == 3:
if im.shape[2] in [3, 4]:
tmp = np.ones((im.shape[0], im.shape[1], 4),
dtype=np.uint8) * 255
for i in range(3):
tmp[:, :, i + 1] = im[:, :, i]
if im.shape[2] == 4:
tmp[:, :, 0] = im[:, :, 3] # swap channel where alpha is
else: # pragma: no cover
raise ValueError("Invalid shape to be an image.")
elif len(im.shape) == 2:
tmp = np.ones((im.shape[0], im.shape[1], 4), dtype=np.uint8)*255
for i in range(3):
tmp[:, :, i + 1] = im[:, :]
else: # pragma: no cover
raise ValueError("Invalid shape to be an image.")
# we changed the image to uint8 4 channels.
# now compress!
self._data = zlib.compress(tmp.tostring(), zlib.DEFLATED)
self.imshape = im.shape
def zlib_compressobj(level=6, method=zlib.DEFLATED, wbits=15, memlevel=8,
strategy=zlib.Z_DEFAULT_STRATEGY):
return zlib.compressobj(level, method, wbits, memlevel, strategy)
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def compress_zlib(data):
compressed = None
zlib_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS)
try:
compressed = zlib_compress.compress(data) + zlib_compress.flush()
except Exception as e:
print_error('Error when compressing with Zlib: {0}'.format(e))
return compressed
# def decompress_zlib(data):
# decompressed = None
# i = -15
# # The absolute value of wbits is the base two logarithm of the size of the history buffer (the "window size")
# # used when compressing data
# valid_wbits = []
# truncated_wbits = []
# decompressed = None
# for i in range(-15,16):
# try:
# decompressed = zlib.decompress(data, i)
# valid_wbits.append(i)
# except zlib.error as e:
# # Error -5 : incomplete or truncated data input implies that it's zlib compressed but the stream is not complete
# if str(e).startswith('Error -5'):
# tuncated_wbits.append(i)
# return (valid_wbits, truncated_wbits, decompressed)
def compress_deflate(data):
compressed = None
deflate_compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS)
try:
compressed = deflate_compress.compress(data) + deflate_compress.flush()
except Exception as e:
print_error('Error when compressing with Deflate: {0}'.format(e))
return compressed
def __init__(self, window_bits):
self._logger = get_class_logger(self)
self._compress = zlib.compressobj(
zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -window_bits)
def compress(body, compress_level):
"""Compress 'body' at the given compress_level."""
import zlib
# See http://www.gzip.org/zlib/rfc-gzip.html
yield ntob('\x1f\x8b') # ID1 and ID2: gzip marker
yield ntob('\x08') # CM: compression method
yield ntob('\x00') # FLG: none set
# MTIME: 4 bytes
yield struct.pack('<L', int(time.time()) & int('FFFFFFFF', 16))
yield ntob('\x02') # XFL: max compression, slowest algo
yield ntob('\xff') # OS: unknown
crc = zlib.crc32(ntob(''))
size = 0
zobj = zlib.compressobj(compress_level,
zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
for line in body:
size += len(line)
crc = zlib.crc32(line, crc)
yield zobj.compress(line)
yield zobj.flush()
# CRC32: 4 bytes
yield struct.pack('<L', crc & int('FFFFFFFF', 16))
# ISIZE: 4 bytes
yield struct.pack('<L', size & int('FFFFFFFF', 16))
def _write_block(self, block):
# print("Saving %i bytes" % len(block))
assert len(block) <= 65536
# Giving a negative window bits means no gzip/zlib headers,
# -15 used in samtools
c = zlib.compressobj(self.compresslevel,
zlib.DEFLATED,
-15,
zlib.DEF_MEM_LEVEL,
0)
compressed = c.compress(block) + c.flush()
del c
assert len(compressed) < 65536, \
"TODO - Didn't compress enough, try less data in this block"
crc = zlib.crc32(block)
# Should cope with a mix of Python platforms...
if crc < 0:
crc = struct.pack("<i", crc)
else:
crc = struct.pack("<I", crc)
bsize = struct.pack("<H", len(compressed) + 25) # includes -1
crc = struct.pack("<I", zlib.crc32(block) & 0xffffffff)
uncompressed_length = struct.pack("<I", len(block))
# Fixed 16 bytes,
# gzip magic bytes (4) mod time (4),
# gzip flag (1), os (1), extra length which is six (2),
# sub field which is BC (2), sub field length of two (2),
# Variable data,
# 2 bytes: block length as BC sub field (2)
# X bytes: the data
# 8 bytes: crc (4), uncompressed data length (4)
data = _bgzf_header + bsize + compressed + crc + uncompressed_length
self._handle.write(data)
def start_compress_message(self):
# compressobj([level[, method[, wbits[, mem_level[, strategy]]]]])
# http://bugs.python.org/issue19278
# http://hg.python.org/cpython/rev/c54c8e71b79a
if self._is_server:
if self._compressor is None or self.server_no_context_takeover:
self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -self.server_max_window_bits, self.mem_level)
else:
if self._compressor is None or self.client_no_context_takeover:
self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -self.client_max_window_bits, self.mem_level)
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def __init__(self, compressLevel, request):
self._zlibCompressor = zlib.compressobj(
compressLevel, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
self._request = request
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def zlib_compressobj(level=6, method=zlib.DEFLATED, wbits=15, memlevel=8,
strategy=zlib.Z_DEFAULT_STRATEGY):
return zlib.compressobj(level, method, wbits, memlevel, strategy)
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)