python类compressobj()的实例源码

codecs.py 文件源码 项目:pytoshop 作者: mdboom 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def compress_zip(fd,      # type: BinaryIO
                 image,   # type: np.ndarray
                 depth,   # type: int
                 version  # type: int
                 ):       # type: (...) -> None
    """
    Write a Numpy array to a zip (zlib) compressed stream.

{}
    """
    image = normalize_image(image, depth)
    if util.needs_byteswap(image):
        compressor = zlib.compressobj()
        for row in image:
            row = util.do_byteswap(row)
            fd.write(compressor.compress(row))
        fd.write(compressor.flush())
    else:
        fd.write(zlib.compress(image))
sap-thatsnotwhatisaid.py 文件源码 项目:tools 作者: pwnaccelerator 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def gzip_encode_add_padding(content):
    "* Compressing content..."
    num_chunks = len(content) / CHUNK_SIZE # let's not care about remainders
    gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
    data = gzip_compress.compress(content)
    comp_cnt = 0
    replay = reorder(content[0:num_chunks*CHUNK_SIZE], arg_blocks)
    assert(len(replay) % CHUNK_SIZE == 0)
    num_chunks = len(replay) / CHUNK_SIZE # update the blocks
    print "** Duplicating content (CBC attack)..."
    data += gzip_compress.compress(replay) # duplicate cipher, should result in duplicate plaintext (prefixed by some garbage)
    while comp_cnt < WRAP_SIZE-(num_chunks*CHUNK_SIZE+10*CHUNK_SIZE):
        data += gzip_compress.compress("A"*CHUNK_SIZE)
        comp_cnt += CHUNK_SIZE
    print "** Copy original padding..."
    data += gzip_compress.compress(content[len(content) - 10*CHUNK_SIZE:len(content)]) # copy valid PKCS7 padding
    data = data + gzip_compress.flush()
    print "*** Finished"
    return data
minigzip.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def compress (filename, input, output):
    output.write('\037\213\010')        # Write the header, ...
    output.write(chr(FNAME))            # ... flag byte ...

    statval = os.stat(filename)           # ... modification time ...
    mtime = statval[8]
    write32(output, mtime)
    output.write('\002')                # ... slowest compression alg. ...
    output.write('\377')                # ... OS (=unknown) ...
    output.write(filename+'\000')       # ... original filename ...

    crcval = zlib.crc32("")
    compobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
                             zlib.DEF_MEM_LEVEL, 0)
    while True:
        data = input.read(1024)
        if data == "":
            break
        crcval = zlib.crc32(data, crcval)
        output.write(compobj.compress(data))
    output.write(compobj.flush())
    write32(output, crcval)             # ... the CRC ...
    write32(output, statval[6])         # and the file size.
upload_ted.py 文件源码 项目:ted-editor 作者: tarnheld 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def deflate(data, compresslevel=9):
    # Compress
    compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
    deflated = compress.compress(data)
    deflated += compress.flush()
    # Add PDs compression magic and negated file length (8 bytes)
    length = int(-len(deflated))
    magic = bytearray(b'\xc5\xee\xf7\xff\x00\x00\x00\x00')
    magic[4] = byte(length, 0)
    magic[5] = byte(length >> 8, 0)
    magic[6] = byte(length >> 0x10, 0)
    magic[7] = byte(length >> 0x18, 0)
    deflatedwithheader = bytes(magic) + deflated
    finaldata = xor(deflatedwithheader)
    return finaldata

# Decrypts the GT6TED, removes the magic and negated file length and
# decompresses the data
compression-identifier.py 文件源码 项目:compression-identifier 作者: koutto 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def compress_gzip(data):
    compressed = None
    gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
    try:
        compressed = gzip_compress.compress(data) + gzip_compress.flush()
    except Exception as e:
        print_error('Error when compressing with Gzip: {0}'.format(e))
    return compressed

# def decompress_gzip(data):
#   decompressed=None
#   try:
#       decompressed = zlib.decompress(gzip_data, zlib.MAX_WBITS|16)
#   except:
#       pass
#   return decompressed
sphinx_fakeinv.py 文件源码 项目:sphinx-fakeinv 作者: dahlia 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def print_inventory(file_, package, version, objects):
    file_.write(b'# Sphinx inventory version 2\n')
    file_.write(b'# Project: ')
    file_.write(package.encode('utf-8'))
    file_.write(b'\n')
    file_.write(b'# Version: ')
    file_.write(version.encode('utf-8'))
    file_.write(b'\n')
    file_.write(b'# The remainder of this file is compressed using zlib.\n')
    codec = zlib.compressobj()
    fmt = '{0} {1} {2} . -\n'.format
    for name, kind, mysterious_number, _ in objects:
        line = fmt(name, kind, mysterious_number)
        code = codec.compress(line.encode('utf-8'))
        file_.write(code)
    file_.write(codec.flush())
    file_.flush()
sarc.py 文件源码 项目:3dstools 作者: ObsidianX 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def compress_file(self):
        # stream-compress to another file then overwrite original
        self.file = open(self.filename, 'rb')
        compressed_filename = '%s.zlib' % self.filename
        compressed_file = open(compressed_filename, 'wb')
        compressor = zlib.compressobj(self.compression_level)

        compressed_file.write(struct.pack('>I', os.stat(self.filename).st_size))

        data = self.file.read(READ_AMOUNT)
        while len(data) > 0:
            compressed_file.write(compressor.compress(data))
            data = self.file.read(READ_AMOUNT)

        compressed_file.write(compressor.flush(zlib.Z_FINISH))

        self.file.close()
        compressed_file.close()

        os.rename(compressed_filename, self.filename)
_gzip.py 文件源码 项目:mechanize 作者: python-mechanize 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def compress_readable_output(src_file, compress_level=6):
    crc = zlib.crc32(b"")
    size = 0
    zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS,
                            zlib.DEF_MEM_LEVEL, zlib.Z_DEFAULT_STRATEGY)
    prefix_written = False
    while True:
        data = src_file.read(DEFAULT_BUFFER_SIZE)
        if not data:
            break
        size += len(data)
        crc = zlib.crc32(data, crc)
        data = zobj.compress(data)
        if not prefix_written:
            prefix_written = True
            data = gzip_prefix() + data
        yield data
    yield zobj.flush() + struct.pack(b"<LL", crc & 0xffffffffL, size)
transport.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _newKeys(self):
        """
        Called back by a subclass once a I{MSG_NEWKEYS} message has been
        received.  This indicates key exchange has completed and new encryption
        and compression parameters should be adopted.  Any messages which were
        queued during key exchange will also be flushed.
        """
        log.msg('NEW KEYS')
        self.currentEncryptions = self.nextEncryptions
        if self.outgoingCompressionType == b'zlib':
            self.outgoingCompression = zlib.compressobj(6)
        if self.incomingCompressionType == b'zlib':
            self.incomingCompression = zlib.decompressobj()

        self._keyExchangeState = self._KEY_EXCHANGE_NONE
        messages = self._blockedByKeyExchange
        self._blockedByKeyExchange = None
        for (messageType, payload) in messages:
            self.sendPacket(messageType, payload)
protocol.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def add_compression_filter(self, encoding='deflate', *,
                               EOF_MARKER=EOF_MARKER, EOL_MARKER=EOL_MARKER):
        """Compress incoming stream with deflate or gzip encoding."""
        zlib_mode = (16 + zlib.MAX_WBITS
                     if encoding == 'gzip' else -zlib.MAX_WBITS)
        zcomp = zlib.compressobj(wbits=zlib_mode)

        chunk = yield
        while True:
            if chunk is EOF_MARKER:
                yield zcomp.flush()
                chunk = yield EOF_MARKER

            else:
                yield zcomp.compress(chunk)
                chunk = yield EOL_MARKER
test_recorder.py 文件源码 项目:pytest-vts 作者: bhodorog 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_recording_gzipped_responses_as_text(vts_rec_on, httpserver):
    data = "Hello!"
    # http://stackoverflow.com/a/22310760
    gzip_compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
    gzipped = gzip_compressor.compress(data.encode()) + gzip_compressor.flush()
    httpserver.serve_content(
        gzipped, 200,
        headers={"Content-Encoding": "gzip"})
    url = "{}/".format(httpserver.url)
    resp = requests.get(url)
    assert resp.status_code == 200
    assert resp.text == data
    assert len(vts_rec_on.cassette) == 1
    track = vts_rec_on.cassette[0]
    assert track['request']['url'] == url
    assert "Content-Encoding" in track['response']['headers']
    assert track['response']['body'] == data


# enable pytester fixture which allows running pytests within tests
test_web_functional.py 文件源码 项目:aiohttp-tokio 作者: fafhrd91 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_response_with_precompressed_body_gzip(loop, test_client):

    @asyncio.coroutine
    def handler(request):
        headers = {'Content-Encoding': 'gzip'}
        zcomp = zlib.compressobj(wbits=16 + zlib.MAX_WBITS)
        data = zcomp.compress(b'mydata') + zcomp.flush()
        return web.Response(body=data, headers=headers)

    app = web.Application()
    app.router.add_get('/', handler)
    client = yield from test_client(app)

    resp = yield from client.get('/')
    assert 200 == resp.status
    data = yield from resp.read()
    assert b'mydata' == data
    assert resp.headers.get('Content-Encoding') == 'gzip'
test_web_functional.py 文件源码 项目:aiohttp-tokio 作者: fafhrd91 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_response_with_precompressed_body_deflate(loop, test_client):

    @asyncio.coroutine
    def handler(request):
        headers = {'Content-Encoding': 'deflate'}
        zcomp = zlib.compressobj(wbits=-zlib.MAX_WBITS)
        data = zcomp.compress(b'mydata') + zcomp.flush()
        return web.Response(body=data, headers=headers)

    app = web.Application()
    app.router.add_get('/', handler)
    client = yield from test_client(app)

    resp = yield from client.get('/')
    assert 200 == resp.status
    data = yield from resp.read()
    assert b'mydata' == data
    assert resp.headers.get('Content-Encoding') == 'deflate'
filetools.py 文件源码 项目:AwesomenautsFileDumper 作者: Nodja 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def deflate(data):
    c = zlib.compressobj()
    out = c.compress(data)
    out += c.flush(zlib.Z_SYNC_FLUSH)
    return out
imaplib2.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def start_compressing(self):
        """start_compressing()
        Enable deflate compression on the socket (RFC 4978)."""

        # rfc 1951 - pure DEFLATE, so use -15 for both windows
        self.decompressor = zlib.decompressobj(-15)
        self.compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
websocket.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _create_compressor(self):
        return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
                                zlib.DEFLATED, -self._max_wbits)
websocket.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _create_compressor(self):
        return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
                                zlib.DEFLATED, -self._max_wbits)
websocket.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _create_compressor(self):
        return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
                                zlib.DEFLATED, -self._max_wbits)
webhandle.py 文件源码 项目:histwords 作者: williamleif 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def write_response(handler, code, headers, data=""):
    handler.send_response(200)
    for header in headers:
        i = header.index(":")
        s,e = header[:i], header[i+1:]
        handler.send_header(s,e)

    if data:
        zlib_encode = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
        content = zlib_encode.compress(data) + zlib_encode.flush()

        if len(content) < len(data):
            handler.send_header('Content-Encoding', 'gzip')
            handler.send_header('Content-Length', len(content))
        else:
            content = data

        handler.end_headers()

        handler.wfile.write(content)
    else:
        handler.wfile.write(data)
transport.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def ssh_NEWKEYS(self, packet):
        if packet != '':
            self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data")
        self.currentEncryptions = self.nextEncryptions
        if self.outgoingCompressionType == 'zlib':
            self.outgoingCompression = zlib.compressobj(6)
            #self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH)
        if self.incomingCompressionType == 'zlib':
            self.incomingCompression = zlib.decompressobj()
transport.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ssh_NEWKEYS(self, packet):
        if packet != '':
            self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data")
        if not self.nextEncryptions.enc_block_size:
            self._gotNewKeys = 1
            return
        self.currentEncryptions = self.nextEncryptions
        if self.outgoingCompressionType == 'zlib':
            self.outgoingCompression = zlib.compressobj(6)
            #self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH)
        if self.incomingCompressionType == 'zlib':
            self.incomingCompression = zlib.decompressobj()
        self.connectionSecure()
zlib_server.py 文件源码 项目:pymotw3 作者: reingart 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def handle(self):
        compressor = zlib.compressobj(1)

        # Find out what file the client wants
        filename = self.request.recv(1024).decode('utf-8')
        self.logger.debug('client asked for: %r', filename)

        # Send chunks of the file as they are compressed
        with open(filename, 'rb') as input:
            while True:
                block = input.read(BLOCK_SIZE)
                if not block:
                    break
                self.logger.debug('RAW %r', block)
                compressed = compressor.compress(block)
                if compressed:
                    self.logger.debug(
                        'SENDING %r',
                        binascii.hexlify(compressed))
                    self.request.send(compressed)
                else:
                    self.logger.debug('BUFFERING')

        # Send any data being buffered by the compressor
        remaining = compressor.flush()
        while remaining:
            to_send = remaining[:BLOCK_SIZE]
            remaining = remaining[BLOCK_SIZE:]
            self.logger.debug('FLUSHING %r',
                              binascii.hexlify(to_send))
            self.request.send(to_send)
        return
mapgenerator.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, map_, parent=False):
        # parent=True enables saving all data sent instead of just
        # deleting it afterwards.
        self.parent = parent
        self.generator = map_.get_generator()
        self.compressor = zlib.compressobj(COMPRESSION_LEVEL)
compress.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self):
        self.z = zlib.compressobj(9)
resource_sizes.py 文件源码 项目:nojs 作者: chrisdickinson 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _CalculateCompressedSize(file_path):
  CHUNK_SIZE = 256 * 1024
  compressor = zlib.compressobj()
  total_size = 0
  with open(file_path, 'rb') as f:
    for chunk in iter(lambda: f.read(CHUNK_SIZE), ''):
      total_size += len(compressor.compress(chunk))
  total_size += len(compressor.flush())
  return total_size
compressors.py 文件源码 项目:aquests 作者: hansroh 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__ (self, level = 5):
        self.compressor = zlib.compressobj (5, zlib.DEFLATED)
compressors.py 文件源码 项目:aquests 作者: hansroh 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__ (self, level = 5):
        self.size = 0
        self.crc = zlib.crc32(b"")
        self.compressor = zlib.compressobj (level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
        self.first_data = True
producers.py 文件源码 项目:aquests 作者: hansroh 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def __init__ (self, producer, level=6):
        self.producer = producer
        self.compressor = zlib.compressobj (level, zlib.DEFLATED)
        self.override ()
warcwriter.py 文件源码 项目:warcio 作者: webrecorder 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, out):
        self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16)
        self.out = out
test_bufferedreaders.py 文件源码 项目:warcio 作者: webrecorder 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def compress(buff):
    buff = buff.encode('utf-8')
    compressobj = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16)
    compressed = compressobj.compress(buff)
    compressed += compressobj.flush()

    return compressed

# plain "inflate"


问题


面经


文章

微信
公众号

扫码关注公众号