def gzip_encode_add_padding(content):
"* Compressing content..."
num_chunks = len(content) / CHUNK_SIZE # let's not care about remainders
gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
data = gzip_compress.compress(content)
comp_cnt = 0
replay = reorder(content[0:num_chunks*CHUNK_SIZE], arg_blocks)
assert(len(replay) % CHUNK_SIZE == 0)
num_chunks = len(replay) / CHUNK_SIZE # update the blocks
print "** Duplicating content (CBC attack)..."
data += gzip_compress.compress(replay) # duplicate cipher, should result in duplicate plaintext (prefixed by some garbage)
while comp_cnt < WRAP_SIZE-(num_chunks*CHUNK_SIZE+10*CHUNK_SIZE):
data += gzip_compress.compress("A"*CHUNK_SIZE)
comp_cnt += CHUNK_SIZE
print "** Copy original padding..."
data += gzip_compress.compress(content[len(content) - 10*CHUNK_SIZE:len(content)]) # copy valid PKCS7 padding
data = data + gzip_compress.flush()
print "*** Finished"
return data
python类DEFLATED的实例源码
def compress (filename, input, output):
output.write('\037\213\010') # Write the header, ...
output.write(chr(FNAME)) # ... flag byte ...
statval = os.stat(filename) # ... modification time ...
mtime = statval[8]
write32(output, mtime)
output.write('\002') # ... slowest compression alg. ...
output.write('\377') # ... OS (=unknown) ...
output.write(filename+'\000') # ... original filename ...
crcval = zlib.crc32("")
compobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
while True:
data = input.read(1024)
if data == "":
break
crcval = zlib.crc32(data, crcval)
output.write(compobj.compress(data))
output.write(compobj.flush())
write32(output, crcval) # ... the CRC ...
write32(output, statval[6]) # and the file size.
def deflate(data, compresslevel=9):
# Compress
compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
deflated = compress.compress(data)
deflated += compress.flush()
# Add PDs compression magic and negated file length (8 bytes)
length = int(-len(deflated))
magic = bytearray(b'\xc5\xee\xf7\xff\x00\x00\x00\x00')
magic[4] = byte(length, 0)
magic[5] = byte(length >> 8, 0)
magic[6] = byte(length >> 0x10, 0)
magic[7] = byte(length >> 0x18, 0)
deflatedwithheader = bytes(magic) + deflated
finaldata = xor(deflatedwithheader)
return finaldata
# Decrypts the GT6TED, removes the magic and negated file length and
# decompresses the data
def compress_gzip(data):
compressed = None
gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
try:
compressed = gzip_compress.compress(data) + gzip_compress.flush()
except Exception as e:
print_error('Error when compressing with Gzip: {0}'.format(e))
return compressed
# def decompress_gzip(data):
# decompressed=None
# try:
# decompressed = zlib.decompress(gzip_data, zlib.MAX_WBITS|16)
# except:
# pass
# return decompressed
def compress_readable_output(src_file, compress_level=6):
crc = zlib.crc32(b"")
size = 0
zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, zlib.Z_DEFAULT_STRATEGY)
prefix_written = False
while True:
data = src_file.read(DEFAULT_BUFFER_SIZE)
if not data:
break
size += len(data)
crc = zlib.crc32(data, crc)
data = zobj.compress(data)
if not prefix_written:
prefix_written = True
data = gzip_prefix() + data
yield data
yield zobj.flush() + struct.pack(b"<LL", crc & 0xffffffffL, size)
def test_recording_gzipped_responses_as_text(vts_rec_on, httpserver):
data = "Hello!"
# http://stackoverflow.com/a/22310760
gzip_compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
gzipped = gzip_compressor.compress(data.encode()) + gzip_compressor.flush()
httpserver.serve_content(
gzipped, 200,
headers={"Content-Encoding": "gzip"})
url = "{}/".format(httpserver.url)
resp = requests.get(url)
assert resp.status_code == 200
assert resp.text == data
assert len(vts_rec_on.cassette) == 1
track = vts_rec_on.cassette[0]
assert track['request']['url'] == url
assert "Content-Encoding" in track['response']['headers']
assert track['response']['body'] == data
# enable pytester fixture which allows running pytests within tests
def start_compressing(self):
"""start_compressing()
Enable deflate compression on the socket (RFC 4978)."""
# rfc 1951 - pure DEFLATE, so use -15 for both windows
self.decompressor = zlib.decompressobj(-15)
self.compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def write_response(handler, code, headers, data=""):
handler.send_response(200)
for header in headers:
i = header.index(":")
s,e = header[:i], header[i+1:]
handler.send_header(s,e)
if data:
zlib_encode = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
content = zlib_encode.compress(data) + zlib_encode.flush()
if len(content) < len(data):
handler.send_header('Content-Encoding', 'gzip')
handler.send_header('Content-Length', len(content))
else:
content = data
handler.end_headers()
handler.wfile.write(content)
else:
handler.wfile.write(data)
def __init__(self, im):
DefinitionTag.__init__(self)
self.tagtype = 36 # DefineBitsLossless2
# convert image (note that format is ARGB)
# even a grayscale image is stored in ARGB, nevertheless,
# the fabilous deflate compression will make it that not much
# more data is required for storing (25% or so, and less than 10%
# when storing RGB as ARGB).
if len(im.shape) == 3:
if im.shape[2] in [3, 4]:
tmp = np.ones((im.shape[0], im.shape[1], 4),
dtype=np.uint8) * 255
for i in range(3):
tmp[:, :, i + 1] = im[:, :, i]
if im.shape[2] == 4:
tmp[:, :, 0] = im[:, :, 3] # swap channel where alpha is
else: # pragma: no cover
raise ValueError("Invalid shape to be an image.")
elif len(im.shape) == 2:
tmp = np.ones((im.shape[0], im.shape[1], 4), dtype=np.uint8)*255
for i in range(3):
tmp[:, :, i + 1] = im[:, :]
else: # pragma: no cover
raise ValueError("Invalid shape to be an image.")
# we changed the image to uint8 4 channels.
# now compress!
self._data = zlib.compress(tmp.tostring(), zlib.DEFLATED)
self.imshape = im.shape
def __init__ (self, level = 5):
self.compressor = zlib.compressobj (5, zlib.DEFLATED)
def __init__ (self, level = 5):
self.size = 0
self.crc = zlib.crc32(b"")
self.compressor = zlib.compressobj (level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
self.first_data = True
def __init__ (self, producer, level=6):
self.producer = producer
self.compressor = zlib.compressobj (level, zlib.DEFLATED)
self.override ()
def __init__(self, out):
self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16)
self.out = out
def compress(buff):
buff = buff.encode('utf-8')
compressobj = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16)
compressed = compressobj.compress(buff)
compressed += compressobj.flush()
return compressed
# plain "inflate"
def compress_alt(buff):
buff = buff.encode('utf-8')
compressobj = zlib.compressobj(6, zlib.DEFLATED)
compressed = compressobj.compress(buff)
compressed += compressobj.flush()
# drop gzip headers/tail
compressed = compressed[2:-4]
return compressed
# Brotli
def frame_outbound(self, proto, opcode, rsv, data, fin):
if not self._compressible_opcode(opcode):
return (rsv, data)
if opcode is not Opcode.CONTINUATION:
rsv = RsvBits(True, *rsv[1:])
if self._compressor is None:
assert opcode is not Opcode.CONTINUATION
if proto.client:
bits = self.client_max_window_bits
else:
bits = self.server_max_window_bits
self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -int(bits))
data = self._compressor.compress(bytes(data))
if fin:
data += self._compressor.flush(zlib.Z_SYNC_FLUSH)
data = data[:-4]
if proto.client:
no_context_takeover = self.client_no_context_takeover
else:
no_context_takeover = self.server_no_context_takeover
if no_context_takeover:
self._compressor = None
return (rsv, data)
def zlib_compressobj(level=6, method=zlib.DEFLATED, wbits=15, memlevel=8,
strategy=zlib.Z_DEFAULT_STRATEGY):
return zlib.compressobj(level, method, wbits, memlevel, strategy)
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def zlib_compressobj(level=6, method=zlib.DEFLATED, wbits=15, memlevel=8,
strategy=zlib.Z_DEFAULT_STRATEGY):
return zlib.compressobj(level, method, wbits, memlevel, strategy)
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def gzip_encode(content):
gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
data = gzip_compress.compress(content) + gzip_compress.flush()
return data
def __init__(self):
self.compressor = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS, 8)
websocket.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
def gzipstream(data):
import zlib
gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16) # compatible to Windows GZipStream
gzip_data = gzip_compress.compress(data) + gzip_compress.flush()
return gzip_data
def start_compressing(self):
"""start_compressing()
Enable deflate compression on the socket (RFC 4978)."""
# rfc 1951 - pure DEFLATE, so use -15 for both windows
self.decompressor = zlib.decompressobj(-15)
self.compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
def __init__(self, filename, mode="rb", compresslevel=9):
# This lock must be recursive, so that BufferedIOBase's
# readline(), readlines() and writelines() don't deadlock.
self._lock = RLock()
self._fp = None
self._closefp = False
self._mode = _MODE_CLOSED
self._pos = 0
self._size = -1
if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9):
raise ValueError("'compresslevel' must be an integer "
"between 1 and 9. You provided 'compresslevel={}'"
.format(compresslevel))
if mode == "rb":
mode_code = _MODE_READ
self._decompressor = zlib.decompressobj(self.wbits)
self._buffer = b""
self._buffer_offset = 0
elif mode == "wb":
mode_code = _MODE_WRITE
self._compressor = zlib.compressobj(compresslevel,
zlib.DEFLATED,
self.wbits,
zlib.DEF_MEM_LEVEL,
0)
else:
raise ValueError("Invalid mode: %r" % (mode,))
if isinstance(filename, _basestring):
self._fp = io.open(filename, mode)
self._closefp = True
self._mode = mode_code
elif hasattr(filename, "read") or hasattr(filename, "write"):
self._fp = filename
self._mode = mode_code
else:
raise TypeError("filename must be a str or bytes object, "
"or a file")
def _create_compressor(self):
return zlib.compressobj(-1, zlib.DEFLATED, -self._max_wbits)