python类decompress()的实例源码

tests.py 文件源码 项目:django-lrucache-backend 作者: kogan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_binary_string(self):
        # Binary strings should be cacheable
        cache = self.cache
        from zlib import compress, decompress
        value = 'value_to_be_compressed'
        compressed_value = compress(value.encode())

        # Test set
        cache.set('binary1', compressed_value)
        compressed_result = cache.get('binary1')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test add
        cache.add('binary1-add', compressed_value)
        compressed_result = cache.get('binary1-add')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test set_many
        cache.set_many({'binary1-set_many': compressed_value})
        compressed_result = cache.get('binary1-set_many')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())
__init__.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
serialize.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _loads_v2(self, request, data):
        try:
            cached = json.loads(zlib.decompress(data).decode("utf8"))
        except ValueError:
            return

        # We need to decode the items that we've base64 encoded
        cached["response"]["body"] = _b64_decode_bytes(
            cached["response"]["body"]
        )
        cached["response"]["headers"] = dict(
            (_b64_decode_str(k), _b64_decode_str(v))
            for k, v in cached["response"]["headers"].items()
        )
        cached["response"]["reason"] = _b64_decode_str(
            cached["response"]["reason"],
        )
        cached["vary"] = dict(
            (_b64_decode_str(k), _b64_decode_str(v) if v is not None else v)
            for k, v in cached["vary"].items()
        )

        return self.prepare_response(request, cached)
itsdangerous.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def load_payload(self, payload):
        decompress = False
        if payload.startswith(b'.'):
            payload = payload[1:]
            decompress = True
        try:
            json = base64_decode(payload)
        except Exception as e:
            raise BadPayload('Could not base64 decode the payload because of '
                'an exception', original_error=e)
        if decompress:
            try:
                json = zlib.decompress(json)
            except Exception as e:
                raise BadPayload('Could not zlib decompress the payload before '
                    'decoding the payload', original_error=e)
        return super(URLSafeSerializerMixin, self).load_payload(json)
serialize.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _loads_v2(self, request, data):
        try:
            cached = json.loads(zlib.decompress(data).decode("utf8"))
        except ValueError:
            return

        # We need to decode the items that we've base64 encoded
        cached["response"]["body"] = _b64_decode_bytes(
            cached["response"]["body"]
        )
        cached["response"]["headers"] = dict(
            (_b64_decode_str(k), _b64_decode_str(v))
            for k, v in cached["response"]["headers"].items()
        )
        cached["response"]["reason"] = _b64_decode_str(
            cached["response"]["reason"],
        )
        cached["vary"] = dict(
            (_b64_decode_str(k), _b64_decode_str(v) if v is not None else v)
            for k, v in cached["vary"].items()
        )

        return self.prepare_response(request, cached)
itsdangerous.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def load_payload(self, payload):
        decompress = False
        if payload.startswith(b'.'):
            payload = payload[1:]
            decompress = True
        try:
            json = base64_decode(payload)
        except Exception as e:
            raise BadPayload('Could not base64 decode the payload because of '
                'an exception', original_error=e)
        if decompress:
            try:
                json = zlib.decompress(json)
            except Exception as e:
                raise BadPayload('Could not zlib decompress the payload before '
                    'decoding the payload', original_error=e)
        return super(URLSafeSerializerMixin, self).load_payload(json)
verify_simple.py 文件源码 项目:shadowsocksr 作者: shadowsocksr-backup 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def client_post_decrypt(self, buf):
        if self.raw_trans:
            return buf
        self.recv_buf += buf
        out_buf = b''
        while len(self.recv_buf) > 2:
            length = struct.unpack('>H', self.recv_buf[:2])[0]
            if length >= 32768:
                self.raw_trans = True
                self.recv_buf = b''
                if self.decrypt_packet_num == 0:
                    return None
                else:
                    raise Exception('server_post_decrype data error')
            if length > len(self.recv_buf):
                break

            out_buf += zlib.decompress(b'x\x9c' + self.recv_buf[2:length])
            self.recv_buf = self.recv_buf[length:]

        if out_buf:
            self.decrypt_packet_num += 1
        return out_buf
verify_simple.py 文件源码 项目:shadowsocksr 作者: shadowsocksr-backup 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def server_post_decrypt(self, buf):
        if self.raw_trans:
            return buf
        self.recv_buf += buf
        out_buf = b''
        while len(self.recv_buf) > 2:
            length = struct.unpack('>H', self.recv_buf[:2])[0]
            if length >= 32768:
                self.raw_trans = True
                self.recv_buf = b''
                if self.decrypt_packet_num == 0:
                    return None
                else:
                    raise Exception('server_post_decrype data error')
            if length > len(self.recv_buf):
                break

            out_buf += zlib.decompress(b'\x78\x9c' + self.recv_buf[2:length])
            self.recv_buf = self.recv_buf[length:]

        if out_buf:
            self.decrypt_packet_num += 1
        return out_buf
utilities.py 文件源码 项目:CoBL-public 作者: lingdb 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def compressedField(field):
    # Decorator for compressed fields:

    def fget(self):
        data = getattr(self, field)
        if data is None:
            return None
        return zlib.decompress(data)

    def fset(self, value):
        setattr(self, field, zlib.compress(value.encode()))

    def fdel(self):
        delattr(self, field)
    return {'doc': "The compression property for %s." % field,
            'fget': fget,
            'fset': fset,
            'fdel': fdel}
__init__.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content, -zlib.MAX_WBITS)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
__init__.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=io.BytesIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content, -zlib.MAX_WBITS)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
serialize.py 文件源码 项目:ascii-art-py 作者: blinglnav 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _loads_v2(self, request, data):
        try:
            cached = json.loads(zlib.decompress(data).decode("utf8"))
        except ValueError:
            return

        # We need to decode the items that we've base64 encoded
        cached["response"]["body"] = _b64_decode_bytes(
            cached["response"]["body"]
        )
        cached["response"]["headers"] = dict(
            (_b64_decode_str(k), _b64_decode_str(v))
            for k, v in cached["response"]["headers"].items()
        )
        cached["response"]["reason"] = _b64_decode_str(
            cached["response"]["reason"],
        )
        cached["vary"] = dict(
            (_b64_decode_str(k), _b64_decode_str(v) if v is not None else v)
            for k, v in cached["vary"].items()
        )

        return self.prepare_response(request, cached)
signing.py 文件源码 项目:Scrum 作者: prakharchoudhary 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None):
    """
    Reverse of dumps(), raises BadSignature if signature fails.

    The serializer is expected to accept a bytestring.
    """
    # TimestampSigner.unsign always returns unicode but base64 and zlib
    # compression operate on bytes.
    base64d = force_bytes(TimestampSigner(key, salt=salt).unsign(s, max_age=max_age))
    decompress = False
    if base64d[:1] == b'.':
        # It's compressed; uncompress it first
        base64d = base64d[1:]
        decompress = True
    data = b64_decode(base64d)
    if decompress:
        data = zlib.decompress(data)
    return serializer().loads(data)
__init__.py 文件源码 项目:Texty 作者: sarthfrey 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
itsdangerous.py 文件源码 项目:Texty 作者: sarthfrey 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def load_payload(self, payload):
        decompress = False
        if payload.startswith(b'.'):
            payload = payload[1:]
            decompress = True
        try:
            json = base64_decode(payload)
        except Exception as e:
            raise BadPayload('Could not base64 decode the payload because of '
                'an exception', original_error=e)
        if decompress:
            try:
                json = zlib.decompress(json)
            except Exception as e:
                raise BadPayload('Could not zlib decompress the payload before '
                    'decoding the payload', original_error=e)
        return super(URLSafeSerializerMixin, self).load_payload(json)
__init__.py 文件源码 项目:office-interoperability-tools 作者: milossramek 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
vchat.py 文件源码 项目:lan-ichat 作者: Forec 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self):
        print("VEDIO server starts...")
        self.sock.bind(self.ADDR)
        self.sock.listen(1)
        conn, addr = self.sock.accept()
        print("remote VEDIO client success connected...")
        data = "".encode("utf-8")
        payload_size = struct.calcsize("L")
        cv2.namedWindow('Remote', cv2.WINDOW_NORMAL)
        while True:
            while len(data) < payload_size:
                data += conn.recv(81920)
            packed_size = data[:payload_size]
            data = data[payload_size:]
            msg_size = struct.unpack("L", packed_size)[0]
            while len(data) < msg_size:
                data += conn.recv(81920)
            zframe_data = data[:msg_size]
            data = data[msg_size:]
            frame_data = zlib.decompress(zframe_data)
            frame = pickle.loads(frame_data)
            cv2.imshow('Remote', frame)
            if cv2.waitKey(1) & 0xFF == 27:
                break
utils.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
    components = data.count(b':')
    if components == 1:
        return secure_loads_deprecated(data, encryption_key, hash_key, compression_level)
    if components != 2:
        return None
    version, signature, encrypted_data = data.split(b':', 2)
    if version != b'hmac256':
        return None
    encryption_key = to_bytes(encryption_key)
    if not hash_key:
        hash_key = hashlib.sha256(encryption_key).digest()
    actual_signature = hmac.new(to_bytes(hash_key), encrypted_data, hashlib.sha256).hexdigest()
    if not compare(to_native(signature), actual_signature):
        return None
    encrypted_data = base64.urlsafe_b64decode(encrypted_data)
    IV, encrypted_data = encrypted_data[:16], encrypted_data[16:]
    cipher, _ = AES_new(pad(encryption_key)[:32], IV=IV)
    try:
        data = unpad(AES_dec(cipher, encrypted_data))
        if compression_level:
            data = zlib.decompress(data)
        return pickle.loads(data)
    except Exception as e:
        return None
itsdangerous.py 文件源码 项目:arithmancer 作者: google 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def load_payload(self, payload):
        decompress = False
        if payload.startswith(b'.'):
            payload = payload[1:]
            decompress = True
        try:
            json = base64_decode(payload)
        except Exception as e:
            raise BadPayload('Could not base64 decode the payload because of '
                'an exception', original_error=e)
        if decompress:
            try:
                json = zlib.decompress(json)
            except Exception as e:
                raise BadPayload('Could not zlib decompress the payload before '
                    'decoding the payload', original_error=e)
        return super(URLSafeSerializerMixin, self).load_payload(json)
itsdangerous.py 文件源码 项目:tesismometro 作者: joapaspe 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def load_payload(self, payload):
        decompress = False
        if payload.startswith(b'.'):
            payload = payload[1:]
            decompress = True
        try:
            json = base64_decode(payload)
        except Exception as e:
            raise BadPayload('Could not base64 decode the payload because of '
                'an exception', original_error=e)
        if decompress:
            try:
                json = zlib.decompress(json)
            except Exception as e:
                raise BadPayload('Could not zlib decompress the payload before '
                    'decoding the payload', original_error=e)
        return super(URLSafeSerializerMixin, self).load_payload(json)


问题


面经


文章

微信
公众号

扫码关注公众号