def test_binary_string(self):
# Binary strings should be cacheable
cache = self.cache
from zlib import compress, decompress
value = 'value_to_be_compressed'
compressed_value = compress(value.encode())
# Test set
cache.set('binary1', compressed_value)
compressed_result = cache.get('binary1')
self.assertEqual(compressed_value, compressed_result)
self.assertEqual(value, decompress(compressed_result).decode())
# Test add
cache.add('binary1-add', compressed_value)
compressed_result = cache.get('binary1-add')
self.assertEqual(compressed_value, compressed_result)
self.assertEqual(value, decompress(compressed_result).decode())
# Test set_many
cache.set_many({'binary1-set_many': compressed_value})
compressed_result = cache.get('binary1-set_many')
self.assertEqual(compressed_value, compressed_result)
self.assertEqual(value, decompress(compressed_result).decode())
python类decompress()的实例源码
def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
def _loads_v2(self, request, data):
try:
cached = json.loads(zlib.decompress(data).decode("utf8"))
except ValueError:
return
# We need to decode the items that we've base64 encoded
cached["response"]["body"] = _b64_decode_bytes(
cached["response"]["body"]
)
cached["response"]["headers"] = dict(
(_b64_decode_str(k), _b64_decode_str(v))
for k, v in cached["response"]["headers"].items()
)
cached["response"]["reason"] = _b64_decode_str(
cached["response"]["reason"],
)
cached["vary"] = dict(
(_b64_decode_str(k), _b64_decode_str(v) if v is not None else v)
for k, v in cached["vary"].items()
)
return self.prepare_response(request, cached)
def load_payload(self, payload):
decompress = False
if payload.startswith(b'.'):
payload = payload[1:]
decompress = True
try:
json = base64_decode(payload)
except Exception as e:
raise BadPayload('Could not base64 decode the payload because of '
'an exception', original_error=e)
if decompress:
try:
json = zlib.decompress(json)
except Exception as e:
raise BadPayload('Could not zlib decompress the payload before '
'decoding the payload', original_error=e)
return super(URLSafeSerializerMixin, self).load_payload(json)
def _loads_v2(self, request, data):
try:
cached = json.loads(zlib.decompress(data).decode("utf8"))
except ValueError:
return
# We need to decode the items that we've base64 encoded
cached["response"]["body"] = _b64_decode_bytes(
cached["response"]["body"]
)
cached["response"]["headers"] = dict(
(_b64_decode_str(k), _b64_decode_str(v))
for k, v in cached["response"]["headers"].items()
)
cached["response"]["reason"] = _b64_decode_str(
cached["response"]["reason"],
)
cached["vary"] = dict(
(_b64_decode_str(k), _b64_decode_str(v) if v is not None else v)
for k, v in cached["vary"].items()
)
return self.prepare_response(request, cached)
def load_payload(self, payload):
decompress = False
if payload.startswith(b'.'):
payload = payload[1:]
decompress = True
try:
json = base64_decode(payload)
except Exception as e:
raise BadPayload('Could not base64 decode the payload because of '
'an exception', original_error=e)
if decompress:
try:
json = zlib.decompress(json)
except Exception as e:
raise BadPayload('Could not zlib decompress the payload before '
'decoding the payload', original_error=e)
return super(URLSafeSerializerMixin, self).load_payload(json)
def client_post_decrypt(self, buf):
if self.raw_trans:
return buf
self.recv_buf += buf
out_buf = b''
while len(self.recv_buf) > 2:
length = struct.unpack('>H', self.recv_buf[:2])[0]
if length >= 32768:
self.raw_trans = True
self.recv_buf = b''
if self.decrypt_packet_num == 0:
return None
else:
raise Exception('server_post_decrype data error')
if length > len(self.recv_buf):
break
out_buf += zlib.decompress(b'x\x9c' + self.recv_buf[2:length])
self.recv_buf = self.recv_buf[length:]
if out_buf:
self.decrypt_packet_num += 1
return out_buf
def server_post_decrypt(self, buf):
if self.raw_trans:
return buf
self.recv_buf += buf
out_buf = b''
while len(self.recv_buf) > 2:
length = struct.unpack('>H', self.recv_buf[:2])[0]
if length >= 32768:
self.raw_trans = True
self.recv_buf = b''
if self.decrypt_packet_num == 0:
return None
else:
raise Exception('server_post_decrype data error')
if length > len(self.recv_buf):
break
out_buf += zlib.decompress(b'\x78\x9c' + self.recv_buf[2:length])
self.recv_buf = self.recv_buf[length:]
if out_buf:
self.decrypt_packet_num += 1
return out_buf
def compressedField(field):
# Decorator for compressed fields:
def fget(self):
data = getattr(self, field)
if data is None:
return None
return zlib.decompress(data)
def fset(self, value):
setattr(self, field, zlib.compress(value.encode()))
def fdel(self):
delattr(self, field)
return {'doc': "The compression property for %s." % field,
'fget': fget,
'fset': fset,
'fdel': fdel}
def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content, -zlib.MAX_WBITS)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=io.BytesIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content, -zlib.MAX_WBITS)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
def _loads_v2(self, request, data):
try:
cached = json.loads(zlib.decompress(data).decode("utf8"))
except ValueError:
return
# We need to decode the items that we've base64 encoded
cached["response"]["body"] = _b64_decode_bytes(
cached["response"]["body"]
)
cached["response"]["headers"] = dict(
(_b64_decode_str(k), _b64_decode_str(v))
for k, v in cached["response"]["headers"].items()
)
cached["response"]["reason"] = _b64_decode_str(
cached["response"]["reason"],
)
cached["vary"] = dict(
(_b64_decode_str(k), _b64_decode_str(v) if v is not None else v)
for k, v in cached["vary"].items()
)
return self.prepare_response(request, cached)
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None):
"""
Reverse of dumps(), raises BadSignature if signature fails.
The serializer is expected to accept a bytestring.
"""
# TimestampSigner.unsign always returns unicode but base64 and zlib
# compression operate on bytes.
base64d = force_bytes(TimestampSigner(key, salt=salt).unsign(s, max_age=max_age))
decompress = False
if base64d[:1] == b'.':
# It's compressed; uncompress it first
base64d = base64d[1:]
decompress = True
data = b64_decode(base64d)
if decompress:
data = zlib.decompress(data)
return serializer().loads(data)
def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
def load_payload(self, payload):
decompress = False
if payload.startswith(b'.'):
payload = payload[1:]
decompress = True
try:
json = base64_decode(payload)
except Exception as e:
raise BadPayload('Could not base64 decode the payload because of '
'an exception', original_error=e)
if decompress:
try:
json = zlib.decompress(json)
except Exception as e:
raise BadPayload('Could not zlib decompress the payload before '
'decoding the payload', original_error=e)
return super(URLSafeSerializerMixin, self).load_payload(json)
def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
def run(self):
print("VEDIO server starts...")
self.sock.bind(self.ADDR)
self.sock.listen(1)
conn, addr = self.sock.accept()
print("remote VEDIO client success connected...")
data = "".encode("utf-8")
payload_size = struct.calcsize("L")
cv2.namedWindow('Remote', cv2.WINDOW_NORMAL)
while True:
while len(data) < payload_size:
data += conn.recv(81920)
packed_size = data[:payload_size]
data = data[payload_size:]
msg_size = struct.unpack("L", packed_size)[0]
while len(data) < msg_size:
data += conn.recv(81920)
zframe_data = data[:msg_size]
data = data[msg_size:]
frame_data = zlib.decompress(zframe_data)
frame = pickle.loads(frame_data)
cv2.imshow('Remote', frame)
if cv2.waitKey(1) & 0xFF == 27:
break
def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
components = data.count(b':')
if components == 1:
return secure_loads_deprecated(data, encryption_key, hash_key, compression_level)
if components != 2:
return None
version, signature, encrypted_data = data.split(b':', 2)
if version != b'hmac256':
return None
encryption_key = to_bytes(encryption_key)
if not hash_key:
hash_key = hashlib.sha256(encryption_key).digest()
actual_signature = hmac.new(to_bytes(hash_key), encrypted_data, hashlib.sha256).hexdigest()
if not compare(to_native(signature), actual_signature):
return None
encrypted_data = base64.urlsafe_b64decode(encrypted_data)
IV, encrypted_data = encrypted_data[:16], encrypted_data[16:]
cipher, _ = AES_new(pad(encryption_key)[:32], IV=IV)
try:
data = unpad(AES_dec(cipher, encrypted_data))
if compression_level:
data = zlib.decompress(data)
return pickle.loads(data)
except Exception as e:
return None
def load_payload(self, payload):
decompress = False
if payload.startswith(b'.'):
payload = payload[1:]
decompress = True
try:
json = base64_decode(payload)
except Exception as e:
raise BadPayload('Could not base64 decode the payload because of '
'an exception', original_error=e)
if decompress:
try:
json = zlib.decompress(json)
except Exception as e:
raise BadPayload('Could not zlib decompress the payload before '
'decoding the payload', original_error=e)
return super(URLSafeSerializerMixin, self).load_payload(json)
def load_payload(self, payload):
decompress = False
if payload.startswith(b'.'):
payload = payload[1:]
decompress = True
try:
json = base64_decode(payload)
except Exception as e:
raise BadPayload('Could not base64 decode the payload because of '
'an exception', original_error=e)
if decompress:
try:
json = zlib.decompress(json)
except Exception as e:
raise BadPayload('Could not zlib decompress the payload before '
'decoding the payload', original_error=e)
return super(URLSafeSerializerMixin, self).load_payload(json)