def set_internal(self, key, value):
# type: (str, bytes) -> None
value = zlib.compress(value, self.ZLIB_COMPRESSION_LEVEL)
self.kvstore.put(key, value)
python类compress()的实例源码
def encode_npz(subvol):
"""
This file format is unrelated to np.savez
We are just saving as .npy and the compressing
using zlib.
The .npy format contains metadata indicating
shape and dtype, instead of np.tobytes which doesn't
contain any metadata.
"""
fileobj = io.BytesIO()
if len(subvol.shape) == 3:
subvol = np.expand_dims(subvol, 0)
np.save(fileobj, subvol)
cdz = zlib.compress(fileobj.getvalue())
return cdz
def add_itxt(self, key, value, lang="", tkey="", zip=False):
"""Appends an iTXt chunk.
:param key: latin-1 encodable text key name
:param value: value for this key
:param lang: language code
:param tkey: UTF-8 version of the key name
:param zip: compression flag
"""
if not isinstance(key, bytes):
key = key.encode("latin-1", "strict")
if not isinstance(value, bytes):
value = value.encode("utf-8", "strict")
if not isinstance(lang, bytes):
lang = lang.encode("utf-8", "strict")
if not isinstance(tkey, bytes):
tkey = tkey.encode("utf-8", "strict")
if zip:
self.add(b"iTXt", key + b"\0\x01\0" + lang + b"\0" + tkey + b"\0" +
zlib.compress(value))
else:
self.add(b"iTXt", key + b"\0\0\0" + lang + b"\0" + tkey + b"\0" +
value)
def add_text(self, key, value, zip=0):
"""Appends a text chunk.
:param key: latin-1 encodable text key name
:param value: value for this key, text or an
:py:class:`PIL.PngImagePlugin.iTXt` instance
:param zip: compression flag
"""
if isinstance(value, iTXt):
return self.add_itxt(key, value, value.lang, value.tkey, bool(zip))
# The tEXt chunk stores latin-1 text
if not isinstance(value, bytes):
try:
value = value.encode('latin-1', 'strict')
except UnicodeError:
return self.add_itxt(key, value, zip=bool(zip))
if not isinstance(key, bytes):
key = key.encode('latin-1', 'strict')
if zip:
self.add(b"zTXt", key + b"\0\0" + zlib.compress(value))
else:
self.add(b"tEXt", key + b"\0" + value)
# --------------------------------------------------------------------
# PNG image stream (IHDR/IEND)
def compress(data, level=ZLIB_COMPRESSION_LEVEL):
""" Compress 'data' to Zlib format. If 'USE_ZOPFLI' variable is True,
zopfli is used instead of the zlib module.
The compression 'level' must be between 0 and 9. 1 gives best speed,
9 gives best compression (0 gives no compression at all).
The default value is a compromise between speed and compression (6).
"""
if not (0 <= level <= 9):
raise ValueError('Bad compression level: %s' % level)
if not USE_ZOPFLI or level == 0:
from zlib import compress
return compress(data, level)
else:
from zopfli.zlib import compress
return compress(data, numiterations=ZOPFLI_LEVELS[level])
def encodeData(self, data):
self.origLength = len(data)
if not self.uncompressed:
compressedData = compress(data, self.zlibCompressionLevel)
if self.uncompressed or len(compressedData) >= self.origLength:
# Encode uncompressed
rawData = data
self.length = self.origLength
else:
rawData = compressedData
self.length = len(rawData)
return rawData
def picklechops(chops):
"""Pickles and base64encodes it's argument chops"""
value = zlib.compress(dumps(chops))
encoded = base64.encodestring(value)
return encoded.strip()
def surfaceData(srf, compress=zlib.compress):
"Convert surface to bytes data with optional compression"
if not isinstance(srf, pygame.Surface): srf = srf.image
w, h = srf.get_size()
a = hasAlpha(srf)
mode = (1 if a else 0) + (2 if compress else 0)
mode = struct.pack("!3I", mode, w, h)
data = pygame.image.tostring(srf, "RGBA" if a else "RGB")
return (compress(data) if compress else data), mode
def add_itxt(self, key, value, lang="", tkey="", zip=False):
"""Appends an iTXt chunk.
:param key: latin-1 encodable text key name
:param value: value for this key
:param lang: language code
:param tkey: UTF-8 version of the key name
:param zip: compression flag
"""
if not isinstance(key, bytes):
key = key.encode("latin-1", "strict")
if not isinstance(value, bytes):
value = value.encode("utf-8", "strict")
if not isinstance(lang, bytes):
lang = lang.encode("utf-8", "strict")
if not isinstance(tkey, bytes):
tkey = tkey.encode("utf-8", "strict")
if zip:
self.add(b"iTXt", key + b"\0\x01\0" + lang + b"\0" + tkey + b"\0" +
zlib.compress(value))
else:
self.add(b"iTXt", key + b"\0\0\0" + lang + b"\0" + tkey + b"\0" +
value)
def add_text(self, key, value, zip=0):
"""Appends a text chunk.
:param key: latin-1 encodable text key name
:param value: value for this key, text or an
:py:class:`PIL.PngImagePlugin.iTXt` instance
:param zip: compression flag
"""
if isinstance(value, iTXt):
return self.add_itxt(key, value, value.lang, value.tkey, bool(zip))
# The tEXt chunk stores latin-1 text
if not isinstance(value, bytes):
try:
value = value.encode('latin-1', 'strict')
except UnicodeError:
return self.add_itxt(key, value, zip=bool(zip))
if not isinstance(key, bytes):
key = key.encode('latin-1', 'strict')
if zip:
self.add(b"zTXt", key + b"\0\0" + zlib.compress(value))
else:
self.add(b"tEXt", key + b"\0" + value)
# --------------------------------------------------------------------
# PNG image stream (IHDR/IEND)
def testExtraPixels(self):
"""Test file that contains too many pixels."""
def eachchunk(chunk):
if chunk[0] != 'IDAT':
return chunk
data = zlib.decompress(chunk[1])
data += strtobytes('\x00garbage')
data = zlib.compress(data)
chunk = (chunk[0], data)
return chunk
self.assertRaises(FormatError, self.helperFormat, eachchunk)
def testNotEnoughPixels(self):
def eachchunk(chunk):
if chunk[0] != 'IDAT':
return chunk
# Remove last byte.
data = zlib.decompress(chunk[1])
data = data[:-1]
data = zlib.compress(data)
return (chunk[0], data)
self.assertRaises(FormatError, self.helperFormat, eachchunk)
def testBadFilter(self):
def eachchunk(chunk):
if chunk[0] != 'IDAT':
return chunk
data = zlib.decompress(chunk[1])
# Corrupt the first filter byte
data = strtobytes('\x99') + data[1:]
data = zlib.compress(data)
return (chunk[0], data)
self.assertRaises(FormatError, self.helperFormat, eachchunk)
def dump_payload(self, obj):
json = super(URLSafeSerializerMixin, self).dump_payload(obj)
is_compressed = False
compressed = zlib.compress(json)
if len(compressed) < (len(json) - 1):
json = compressed
is_compressed = True
base64d = base64_encode(json)
if is_compressed:
base64d = b'.' + base64d
return base64d
def _save(path, obj):
"Save an object to the specified path."
data = zlib.compress(pickletools.optimize(pickle.dumps(obj)), 9)
with open(path, 'wb') as file:
file.write(data)
def quit_game(event=None):
# Save HST and quit program.
file(HST_FILE, 'wb').write(zlib.compress(repr(HS_database), 9))
root.quit()
################################################################################
# HST PREPARATION FUNCTIONS
def asciiCompress(data, level=9):
""" compress data to printable ascii-code """
code = zlib.compress(data,level)
csum = zlib.crc32(code)
code = base64.encodestring(code)
return code, csum
def bz2_pack(source):
"Returns 'source' as a bzip2-compressed, self-extracting python script."
import bz2, base64
out = ""
compressed_source = bz2.compress(source)
out += 'import bz2, base64\n'
out += "exec bz2.decompress(base64.b64decode('"
out += base64.b64encode((compressed_source))
out += "'))\n"
return out
def gz_pack(source):
"Returns 'source' as a gzip-compressed, self-extracting python script."
import zlib, base64
out = ""
compressed_source = zlib.compress(source)
out += 'import zlib, base64\n'
out += "exec zlib.decompress(base64.b64decode('"
out += base64.b64encode((compressed_source))
out += "'))\n"
return out
# The test.+() functions below are for testing pyminifer...
def load(cls, path):
# Loads programs and handles optimized files.
ws = path + '.ws'
cp = path + '.wso'
compiled = False
if os.path.isfile(cp):
compiled = True
if os.path.isfile(ws):
if os.path.getmtime(ws) > os.path.getmtime(cp):
compiled = False
final = cls._final()
cls._check(final)
if compiled:
try:
with open(cp, 'rb') as file:
code = file.read(len(final))
cls._check(code)
data = file.read()
return cls(pickle.loads(zlib.decompress(data)))
except:
pass
data = load(ws)
code = trinary(data)
program = parse(code)
serialized = pickle.dumps(program, pickle.HIGHEST_PROTOCOL)
optimized = zlib.compress(serialized, 9)
with open(cp, 'wb') as file:
file.write(final + optimized)
return cls(program)