def compress (filename, input, output):
output.write('\037\213\010') # Write the header, ...
output.write(chr(FNAME)) # ... flag byte ...
statval = os.stat(filename) # ... modification time ...
mtime = statval[8]
write32(output, mtime)
output.write('\002') # ... slowest compression alg. ...
output.write('\377') # ... OS (=unknown) ...
output.write(filename+'\000') # ... original filename ...
crcval = zlib.crc32("")
compobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
while True:
data = input.read(1024)
if data == "":
break
crcval = zlib.crc32(data, crcval)
output.write(compobj.compress(data))
output.write(compobj.flush())
write32(output, crcval) # ... the CRC ...
write32(output, statval[6]) # and the file size.
python类DEF_MEM_LEVEL的实例源码
def deflate(data, compresslevel=9):
# Compress
compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
deflated = compress.compress(data)
deflated += compress.flush()
# Add PDs compression magic and negated file length (8 bytes)
length = int(-len(deflated))
magic = bytearray(b'\xc5\xee\xf7\xff\x00\x00\x00\x00')
magic[4] = byte(length, 0)
magic[5] = byte(length >> 8, 0)
magic[6] = byte(length >> 0x10, 0)
magic[7] = byte(length >> 0x18, 0)
deflatedwithheader = bytes(magic) + deflated
finaldata = xor(deflatedwithheader)
return finaldata
# Decrypts the GT6TED, removes the magic and negated file length and
# decompresses the data
def compress_readable_output(src_file, compress_level=6):
crc = zlib.crc32(b"")
size = 0
zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, zlib.Z_DEFAULT_STRATEGY)
prefix_written = False
while True:
data = src_file.read(DEFAULT_BUFFER_SIZE)
if not data:
break
size += len(data)
crc = zlib.crc32(data, crc)
data = zobj.compress(data)
if not prefix_written:
prefix_written = True
data = gzip_prefix() + data
yield data
yield zobj.flush() + struct.pack(b"<LL", crc & 0xffffffffL, size)
def __init__ (self, level = 5):
self.size = 0
self.crc = zlib.crc32(b"")
self.compressor = zlib.compressobj (level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
self.first_data = True
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def __init__(self, filename, mode="rb", compresslevel=9):
# This lock must be recursive, so that BufferedIOBase's
# readline(), readlines() and writelines() don't deadlock.
self._lock = RLock()
self._fp = None
self._closefp = False
self._mode = _MODE_CLOSED
self._pos = 0
self._size = -1
if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9):
raise ValueError("'compresslevel' must be an integer "
"between 1 and 9. You provided 'compresslevel={}'"
.format(compresslevel))
if mode == "rb":
mode_code = _MODE_READ
self._decompressor = zlib.decompressobj(self.wbits)
self._buffer = b""
self._buffer_offset = 0
elif mode == "wb":
mode_code = _MODE_WRITE
self._compressor = zlib.compressobj(compresslevel,
zlib.DEFLATED,
self.wbits,
zlib.DEF_MEM_LEVEL,
0)
else:
raise ValueError("Invalid mode: %r" % (mode,))
if isinstance(filename, _basestring):
self._fp = io.open(filename, mode)
self._closefp = True
self._mode = mode_code
elif hasattr(filename, "read") or hasattr(filename, "write"):
self._fp = filename
self._mode = mode_code
else:
raise TypeError("filename must be a str or bytes object, "
"or a file")
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def compress(body, compress_level):
"""Compress 'body' at the given compress_level."""
import zlib
# See http://www.gzip.org/zlib/rfc-gzip.html
yield ntob('\x1f\x8b') # ID1 and ID2: gzip marker
yield ntob('\x08') # CM: compression method
yield ntob('\x00') # FLG: none set
# MTIME: 4 bytes
yield struct.pack('<L', int(time.time()) & int('FFFFFFFF', 16))
yield ntob('\x02') # XFL: max compression, slowest algo
yield ntob('\xff') # OS: unknown
crc = zlib.crc32(ntob(''))
size = 0
zobj = zlib.compressobj(compress_level,
zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
for line in body:
size += len(line)
crc = zlib.crc32(line, crc)
yield zobj.compress(line)
yield zobj.flush()
# CRC32: 4 bytes
yield struct.pack('<L', crc & int('FFFFFFFF', 16))
# ISIZE: 4 bytes
yield struct.pack('<L', size & int('FFFFFFFF', 16))
def _write_block(self, block):
# print("Saving %i bytes" % len(block))
assert len(block) <= 65536
# Giving a negative window bits means no gzip/zlib headers,
# -15 used in samtools
c = zlib.compressobj(self.compresslevel,
zlib.DEFLATED,
-15,
zlib.DEF_MEM_LEVEL,
0)
compressed = c.compress(block) + c.flush()
del c
assert len(compressed) < 65536, \
"TODO - Didn't compress enough, try less data in this block"
crc = zlib.crc32(block)
# Should cope with a mix of Python platforms...
if crc < 0:
crc = struct.pack("<i", crc)
else:
crc = struct.pack("<I", crc)
bsize = struct.pack("<H", len(compressed) + 25) # includes -1
crc = struct.pack("<I", zlib.crc32(block) & 0xffffffff)
uncompressed_length = struct.pack("<I", len(block))
# Fixed 16 bytes,
# gzip magic bytes (4) mod time (4),
# gzip flag (1), os (1), extra length which is six (2),
# sub field which is BC (2), sub field length of two (2),
# Variable data,
# 2 bytes: block length as BC sub field (2)
# X bytes: the data
# 8 bytes: crc (4), uncompressed data length (4)
data = _bgzf_header + bsize + compressed + crc + uncompressed_length
self._handle.write(data)
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
yield compress.compress(item)
yield compress.flush()
yield struct.pack("<2L", crc, size & 0xffffffff)
def compress(body, compress_level):
"""Compress 'body' at the given compress_level."""
import zlib
# See http://www.gzip.org/zlib/rfc-gzip.html
yield ntob('\x1f\x8b') # ID1 and ID2: gzip marker
yield ntob('\x08') # CM: compression method
yield ntob('\x00') # FLG: none set
# MTIME: 4 bytes
yield struct.pack("<L", int(time.time()) & int('FFFFFFFF', 16))
yield ntob('\x02') # XFL: max compression, slowest algo
yield ntob('\xff') # OS: unknown
crc = zlib.crc32(ntob(""))
size = 0
zobj = zlib.compressobj(compress_level,
zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
for line in body:
size += len(line)
crc = zlib.crc32(line, crc)
yield zobj.compress(line)
yield zobj.flush()
# CRC32: 4 bytes
yield struct.pack("<L", crc & int('FFFFFFFF', 16))
# ISIZE: 4 bytes
yield struct.pack("<L", size & int('FFFFFFFF', 16))
def make_zerobin_payload(string_content):
compress = zlib.compressobj(
0, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 4)
compressed_data = compress.compress(bytes(mangle_string(string_content)))
compressed_data += compress.flush()
encoded_data = base64.b64encode(compressed_data)
key = os.urandom(32)
encoded_key = base64.urlsafe_b64encode(key)
encrypted_data = sjcl.SJCL().encrypt(encoded_data, encoded_key)
return encrypted_data, encoded_key
def filter(self, handler):
is_local_client = handler.client_address[0] in ('127.0.0.1', '::1')
pacfile = os.path.join(os.path.dirname(os.path.abspath(__file__)), common.PAC_FILE)
urlparts = urlparse.urlsplit(handler.path)
if handler.command == 'GET' and urlparts.path.lstrip('/') == common.PAC_FILE:
if urlparts.query == 'flush':
if is_local_client:
thread.start_new_thread(PacUtil.update_pacfile, (pacfile,))
else:
return 'mock', {'status': 403, 'headers': {'Content-Type': 'text/plain'}, 'body': 'client address %r not allowed' % handler.client_address[0]}
if time.time() - os.path.getmtime(pacfile) > common.PAC_EXPIRED:
# check system uptime > 30 minutes
uptime = get_uptime()
if uptime and uptime > 1800:
thread.start_new_thread(lambda: os.utime(pacfile, (time.time(), time.time())) or PacUtil.update_pacfile(pacfile), tuple())
with open(pacfile, 'rb') as fp:
content = fp.read()
if not is_local_client:
serving_addr = urlparts.hostname or ProxyUtil.get_listen_ip()
content = content.replace('127.0.0.1', serving_addr)
headers = {'Content-Type': 'text/plain'}
if 'gzip' in handler.headers.get('Accept-Encoding', ''):
headers['Content-Encoding'] = 'gzip'
compressobj = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
dataio = io.BytesIO()
dataio.write('\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff')
dataio.write(compressobj.compress(content))
dataio.write(compressobj.flush())
dataio.write(struct.pack('<LL', zlib.crc32(content) & 0xFFFFFFFFL, len(content) & 0xFFFFFFFFL))
content = dataio.getvalue()
return 'mock', {'status': 200, 'headers': headers, 'body': content}