def compress_alt(buff):
buff = buff.encode('utf-8')
compressobj = zlib.compressobj(6, zlib.DEFLATED)
compressed = compressobj.compress(buff)
compressed += compressobj.flush()
# drop gzip headers/tail
compressed = compressed[2:-4]
return compressed
# Brotli
python类compressobj()的实例源码
def frame_outbound(self, proto, opcode, rsv, data, fin):
if not self._compressible_opcode(opcode):
return (rsv, data)
if opcode is not Opcode.CONTINUATION:
rsv = RsvBits(True, *rsv[1:])
if self._compressor is None:
assert opcode is not Opcode.CONTINUATION
if proto.client:
bits = self.client_max_window_bits
else:
bits = self.server_max_window_bits
self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -int(bits))
data = self._compressor.compress(bytes(data))
if fin:
data += self._compressor.flush(zlib.Z_SYNC_FLUSH)
data = data[:-4]
if proto.client:
no_context_takeover = self.client_no_context_takeover
else:
no_context_takeover = self.server_no_context_takeover
if no_context_takeover:
self._compressor = None
return (rsv, data)
def write_to_stdout(data):
sys.stdout.write(data + '\n')
sys.stdout.flush()
# The standard zlib.compressobj() accepts only positional arguments.
def zlib_compressobj(level=6, method=zlib.DEFLATED, wbits=15, memlevel=8,
strategy=zlib.Z_DEFAULT_STRATEGY):
return zlib.compressobj(level, method, wbits, memlevel, strategy)
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def write_to_stdout(data):
sys.stdout.write(data + '\n')
sys.stdout.flush()
# The standard zlib.compressobj() accepts only positional arguments.
def zlib_compressobj(level=6, method=zlib.DEFLATED, wbits=15, memlevel=8,
strategy=zlib.Z_DEFAULT_STRATEGY):
return zlib.compressobj(level, method, wbits, memlevel, strategy)
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def compress_zip_prediction(fd, # type: BinaryIO
image, # type: np.ndarray
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a Numpy array to a zip (zlib) with prediction compressed
stream.
Not supported for 1- or 32-bit images.
{}
"""
if depth == 1: # pragma: no cover
raise ValueError(
"zip with prediction is not supported for 1-bit images")
elif depth == 32: # pragma: no cover
raise ValueError(
"zip with prediction is not implemented for 32-bit images")
elif depth == 8:
encoder = packbits.encode_prediction_8bit
elif depth == 16:
encoder = packbits.encode_prediction_16bit
compressor = zlib.compressobj()
for row in image:
encoder(row.flatten())
row = util.ensure_bigendian(row)
fd.write(compressor.compress(row))
fd.write(compressor.flush())
def _CalculateCompressedSize(file_path):
CHUNK_SIZE = 256 * 1024
compressor = zlib.compressobj()
total_size = 0
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(CHUNK_SIZE), ''):
total_size += len(compressor.compress(chunk))
total_size += len(compressor.flush())
return total_size
def gzip_encode(content):
gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
data = gzip_compress.compress(content) + gzip_compress.flush()
return data
def _CalculateCompressedSize(file_path):
CHUNK_SIZE = 256 * 1024
compressor = zlib.compressobj()
total_size = 0
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(CHUNK_SIZE), ''):
total_size += len(compressor.compress(chunk))
total_size += len(compressor.flush())
return total_size
def __init__(self):
self.z = zlib.compressobj(9)
def main():
if len(sys.argv) > 1:
filename = sys.argv[1]
else:
filename = sys.argv[0]
print 'Reading', filename
f = open(filename, 'rb') # Get the data to compress
s = f.read()
f.close()
# First, we'll compress the string in one step
comptext = zlib.compress(s, 1)
decomp = zlib.decompress(comptext)
print '1-step compression: (level 1)'
print ' Original:', len(s), 'Compressed:', len(comptext),
print 'Uncompressed:', len(decomp)
# Now, let's compress the string in stages; set chunk to work in smaller steps
chunk = 256
compressor = zlib.compressobj(9)
decompressor = zlib.decompressobj()
comptext = decomp = ''
for i in range(0, len(s), chunk):
comptext = comptext+compressor.compress(s[i:i+chunk])
# Don't forget to call flush()!!
comptext = comptext + compressor.flush()
for i in range(0, len(comptext), chunk):
decomp = decomp + decompressor.decompress(comptext[i:i+chunk])
decomp=decomp+decompressor.flush()
print 'Progressive compression (level 9):'
print ' Original:', len(s), 'Compressed:', len(comptext),
print 'Uncompressed:', len(decomp)
def _init_zlib(self):
'''
Internal method for setting up the zlib compression and
decompression objects.
'''
self._zcomp_read = zlib.decompressobj()
self._zcomp_write = zlib.compressobj(self.compresslevel)
def dump_inventory(self):
self.info(bold('dumping object inventory... '), nonl=True)
f = open(path.join(self.outdir, INVENTORY_FILENAME), 'wb')
try:
f.write((u'# Sphinx inventory version 2\n'
u'# Project: %s\n'
u'# Version: %s\n'
u'# The remainder of this file is compressed using zlib.\n'
% (self.config.project, self.config.version)).encode('utf-8'))
compressor = zlib.compressobj(9)
for domainname, domain in iteritems(self.env.domains):
for name, dispname, type, docname, anchor, prio in \
sorted(domain.get_objects()):
if anchor.endswith(name):
# this can shorten the inventory by as much as 25%
anchor = anchor[:-len(name)] + '$'
uri = self.get_target_uri(docname) + '#' + anchor
if dispname == name:
dispname = u'-'
f.write(compressor.compress(
(u'%s %s:%s %s %s %s\n' % (name, domainname, type,
prio, uri, dispname)).encode('utf-8')))
f.write(compressor.flush())
finally:
f.close()
self.info('done')
def __init__(self):
self.compressor = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS, 8)
def ssh_NEWKEYS(self, packet):
if packet != '':
self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data")
self.currentEncryptions = self.nextEncryptions
if self.outgoingCompressionType == 'zlib':
self.outgoingCompression = zlib.compressobj(6)
#self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH)
if self.incomingCompressionType == 'zlib':
self.incomingCompression = zlib.decompressobj()
def ssh_NEWKEYS(self, packet):
if packet != '':
self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data")
if not self.nextEncryptions.enc_block_size:
self._gotNewKeys = 1
return
self.currentEncryptions = self.nextEncryptions
if self.outgoingCompressionType == 'zlib':
self.outgoingCompression = zlib.compressobj(6)
#self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH)
if self.incomingCompressionType == 'zlib':
self.incomingCompression = zlib.decompressobj()
self.connectionSecure()
websocket.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)