def compress_zip(fd, # type: BinaryIO
image, # type: np.ndarray
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a Numpy array to a zip (zlib) compressed stream.
{}
"""
image = normalize_image(image, depth)
if util.needs_byteswap(image):
compressor = zlib.compressobj()
for row in image:
row = util.do_byteswap(row)
fd.write(compressor.compress(row))
fd.write(compressor.flush())
else:
fd.write(zlib.compress(image))
python类compressobj()的实例源码
def gzip_encode_add_padding(content):
"* Compressing content..."
num_chunks = len(content) / CHUNK_SIZE # let's not care about remainders
gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
data = gzip_compress.compress(content)
comp_cnt = 0
replay = reorder(content[0:num_chunks*CHUNK_SIZE], arg_blocks)
assert(len(replay) % CHUNK_SIZE == 0)
num_chunks = len(replay) / CHUNK_SIZE # update the blocks
print "** Duplicating content (CBC attack)..."
data += gzip_compress.compress(replay) # duplicate cipher, should result in duplicate plaintext (prefixed by some garbage)
while comp_cnt < WRAP_SIZE-(num_chunks*CHUNK_SIZE+10*CHUNK_SIZE):
data += gzip_compress.compress("A"*CHUNK_SIZE)
comp_cnt += CHUNK_SIZE
print "** Copy original padding..."
data += gzip_compress.compress(content[len(content) - 10*CHUNK_SIZE:len(content)]) # copy valid PKCS7 padding
data = data + gzip_compress.flush()
print "*** Finished"
return data
def compress (filename, input, output):
output.write('\037\213\010') # Write the header, ...
output.write(chr(FNAME)) # ... flag byte ...
statval = os.stat(filename) # ... modification time ...
mtime = statval[8]
write32(output, mtime)
output.write('\002') # ... slowest compression alg. ...
output.write('\377') # ... OS (=unknown) ...
output.write(filename+'\000') # ... original filename ...
crcval = zlib.crc32("")
compobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
while True:
data = input.read(1024)
if data == "":
break
crcval = zlib.crc32(data, crcval)
output.write(compobj.compress(data))
output.write(compobj.flush())
write32(output, crcval) # ... the CRC ...
write32(output, statval[6]) # and the file size.
def deflate(data, compresslevel=9):
# Compress
compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
deflated = compress.compress(data)
deflated += compress.flush()
# Add PDs compression magic and negated file length (8 bytes)
length = int(-len(deflated))
magic = bytearray(b'\xc5\xee\xf7\xff\x00\x00\x00\x00')
magic[4] = byte(length, 0)
magic[5] = byte(length >> 8, 0)
magic[6] = byte(length >> 0x10, 0)
magic[7] = byte(length >> 0x18, 0)
deflatedwithheader = bytes(magic) + deflated
finaldata = xor(deflatedwithheader)
return finaldata
# Decrypts the GT6TED, removes the magic and negated file length and
# decompresses the data
def compress_gzip(data):
compressed = None
gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
try:
compressed = gzip_compress.compress(data) + gzip_compress.flush()
except Exception as e:
print_error('Error when compressing with Gzip: {0}'.format(e))
return compressed
# def decompress_gzip(data):
# decompressed=None
# try:
# decompressed = zlib.decompress(gzip_data, zlib.MAX_WBITS|16)
# except:
# pass
# return decompressed
def print_inventory(file_, package, version, objects):
file_.write(b'# Sphinx inventory version 2\n')
file_.write(b'# Project: ')
file_.write(package.encode('utf-8'))
file_.write(b'\n')
file_.write(b'# Version: ')
file_.write(version.encode('utf-8'))
file_.write(b'\n')
file_.write(b'# The remainder of this file is compressed using zlib.\n')
codec = zlib.compressobj()
fmt = '{0} {1} {2} . -\n'.format
for name, kind, mysterious_number, _ in objects:
line = fmt(name, kind, mysterious_number)
code = codec.compress(line.encode('utf-8'))
file_.write(code)
file_.write(codec.flush())
file_.flush()
def compress_file(self):
# stream-compress to another file then overwrite original
self.file = open(self.filename, 'rb')
compressed_filename = '%s.zlib' % self.filename
compressed_file = open(compressed_filename, 'wb')
compressor = zlib.compressobj(self.compression_level)
compressed_file.write(struct.pack('>I', os.stat(self.filename).st_size))
data = self.file.read(READ_AMOUNT)
while len(data) > 0:
compressed_file.write(compressor.compress(data))
data = self.file.read(READ_AMOUNT)
compressed_file.write(compressor.flush(zlib.Z_FINISH))
self.file.close()
compressed_file.close()
os.rename(compressed_filename, self.filename)
def compress_readable_output(src_file, compress_level=6):
crc = zlib.crc32(b"")
size = 0
zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, zlib.Z_DEFAULT_STRATEGY)
prefix_written = False
while True:
data = src_file.read(DEFAULT_BUFFER_SIZE)
if not data:
break
size += len(data)
crc = zlib.crc32(data, crc)
data = zobj.compress(data)
if not prefix_written:
prefix_written = True
data = gzip_prefix() + data
yield data
yield zobj.flush() + struct.pack(b"<LL", crc & 0xffffffffL, size)
def _newKeys(self):
"""
Called back by a subclass once a I{MSG_NEWKEYS} message has been
received. This indicates key exchange has completed and new encryption
and compression parameters should be adopted. Any messages which were
queued during key exchange will also be flushed.
"""
log.msg('NEW KEYS')
self.currentEncryptions = self.nextEncryptions
if self.outgoingCompressionType == b'zlib':
self.outgoingCompression = zlib.compressobj(6)
if self.incomingCompressionType == b'zlib':
self.incomingCompression = zlib.decompressobj()
self._keyExchangeState = self._KEY_EXCHANGE_NONE
messages = self._blockedByKeyExchange
self._blockedByKeyExchange = None
for (messageType, payload) in messages:
self.sendPacket(messageType, payload)
def add_compression_filter(self, encoding='deflate', *,
EOF_MARKER=EOF_MARKER, EOL_MARKER=EOL_MARKER):
"""Compress incoming stream with deflate or gzip encoding."""
zlib_mode = (16 + zlib.MAX_WBITS
if encoding == 'gzip' else -zlib.MAX_WBITS)
zcomp = zlib.compressobj(wbits=zlib_mode)
chunk = yield
while True:
if chunk is EOF_MARKER:
yield zcomp.flush()
chunk = yield EOF_MARKER
else:
yield zcomp.compress(chunk)
chunk = yield EOL_MARKER
def test_recording_gzipped_responses_as_text(vts_rec_on, httpserver):
data = "Hello!"
# http://stackoverflow.com/a/22310760
gzip_compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
gzipped = gzip_compressor.compress(data.encode()) + gzip_compressor.flush()
httpserver.serve_content(
gzipped, 200,
headers={"Content-Encoding": "gzip"})
url = "{}/".format(httpserver.url)
resp = requests.get(url)
assert resp.status_code == 200
assert resp.text == data
assert len(vts_rec_on.cassette) == 1
track = vts_rec_on.cassette[0]
assert track['request']['url'] == url
assert "Content-Encoding" in track['response']['headers']
assert track['response']['body'] == data
# enable pytester fixture which allows running pytests within tests
def test_response_with_precompressed_body_gzip(loop, test_client):
@asyncio.coroutine
def handler(request):
headers = {'Content-Encoding': 'gzip'}
zcomp = zlib.compressobj(wbits=16 + zlib.MAX_WBITS)
data = zcomp.compress(b'mydata') + zcomp.flush()
return web.Response(body=data, headers=headers)
app = web.Application()
app.router.add_get('/', handler)
client = yield from test_client(app)
resp = yield from client.get('/')
assert 200 == resp.status
data = yield from resp.read()
assert b'mydata' == data
assert resp.headers.get('Content-Encoding') == 'gzip'
def test_response_with_precompressed_body_deflate(loop, test_client):
@asyncio.coroutine
def handler(request):
headers = {'Content-Encoding': 'deflate'}
zcomp = zlib.compressobj(wbits=-zlib.MAX_WBITS)
data = zcomp.compress(b'mydata') + zcomp.flush()
return web.Response(body=data, headers=headers)
app = web.Application()
app.router.add_get('/', handler)
client = yield from test_client(app)
resp = yield from client.get('/')
assert 200 == resp.status
data = yield from resp.read()
assert b'mydata' == data
assert resp.headers.get('Content-Encoding') == 'deflate'
def deflate(data):
c = zlib.compressobj()
out = c.compress(data)
out += c.flush(zlib.Z_SYNC_FLUSH)
return out
def start_compressing(self):
"""start_compressing()
Enable deflate compression on the socket (RFC 4978)."""
# rfc 1951 - pure DEFLATE, so use -15 for both windows
self.decompressor = zlib.decompressobj(-15)
self.compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def write_response(handler, code, headers, data=""):
handler.send_response(200)
for header in headers:
i = header.index(":")
s,e = header[:i], header[i+1:]
handler.send_header(s,e)
if data:
zlib_encode = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
content = zlib_encode.compress(data) + zlib_encode.flush()
if len(content) < len(data):
handler.send_header('Content-Encoding', 'gzip')
handler.send_header('Content-Length', len(content))
else:
content = data
handler.end_headers()
handler.wfile.write(content)
else:
handler.wfile.write(data)
def ssh_NEWKEYS(self, packet):
if packet != '':
self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data")
self.currentEncryptions = self.nextEncryptions
if self.outgoingCompressionType == 'zlib':
self.outgoingCompression = zlib.compressobj(6)
#self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH)
if self.incomingCompressionType == 'zlib':
self.incomingCompression = zlib.decompressobj()
def ssh_NEWKEYS(self, packet):
if packet != '':
self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data")
if not self.nextEncryptions.enc_block_size:
self._gotNewKeys = 1
return
self.currentEncryptions = self.nextEncryptions
if self.outgoingCompressionType == 'zlib':
self.outgoingCompression = zlib.compressobj(6)
#self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH)
if self.incomingCompressionType == 'zlib':
self.incomingCompression = zlib.decompressobj()
self.connectionSecure()
def handle(self):
compressor = zlib.compressobj(1)
# Find out what file the client wants
filename = self.request.recv(1024).decode('utf-8')
self.logger.debug('client asked for: %r', filename)
# Send chunks of the file as they are compressed
with open(filename, 'rb') as input:
while True:
block = input.read(BLOCK_SIZE)
if not block:
break
self.logger.debug('RAW %r', block)
compressed = compressor.compress(block)
if compressed:
self.logger.debug(
'SENDING %r',
binascii.hexlify(compressed))
self.request.send(compressed)
else:
self.logger.debug('BUFFERING')
# Send any data being buffered by the compressor
remaining = compressor.flush()
while remaining:
to_send = remaining[:BLOCK_SIZE]
remaining = remaining[BLOCK_SIZE:]
self.logger.debug('FLUSHING %r',
binascii.hexlify(to_send))
self.request.send(to_send)
return
def __init__(self, map_, parent=False):
# parent=True enables saving all data sent instead of just
# deleting it afterwards.
self.parent = parent
self.generator = map_.get_generator()
self.compressor = zlib.compressobj(COMPRESSION_LEVEL)
def __init__(self):
self.z = zlib.compressobj(9)
def _CalculateCompressedSize(file_path):
CHUNK_SIZE = 256 * 1024
compressor = zlib.compressobj()
total_size = 0
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(CHUNK_SIZE), ''):
total_size += len(compressor.compress(chunk))
total_size += len(compressor.flush())
return total_size
def __init__ (self, level = 5):
self.compressor = zlib.compressobj (5, zlib.DEFLATED)
def __init__ (self, level = 5):
self.size = 0
self.crc = zlib.crc32(b"")
self.compressor = zlib.compressobj (level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
self.first_data = True
def __init__ (self, producer, level=6):
self.producer = producer
self.compressor = zlib.compressobj (level, zlib.DEFLATED)
self.override ()
def __init__(self, out):
self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16)
self.out = out
def compress(buff):
buff = buff.encode('utf-8')
compressobj = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16)
compressed = compressobj.compress(buff)
compressed += compressobj.flush()
return compressed
# plain "inflate"