python类compress()的实例源码

bslurp.py 文件源码 项目:openstack-deploy 作者: yaoice 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
bslurp.py 文件源码 项目:openstack-deploy 作者: yaoice 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
    argument_spec = dict(
        compress=dict(default=True, type='bool'),
        dest=dict(type='str'),
        mode=dict(default='0644', type='str'),
        sha1=dict(default=None, type='str'),
        src=dict(required=True, type='str')
    )
    module = AnsibleModule(argument_spec)

    dest = module.params.get('dest')

    try:
        if dest:
            copy_to_host(module)
        else:
            copy_from_host(module)
    except Exception:
        module.exit_json(failed=True, changed=True,
                         msg=repr(traceback.format_exc()))


# import module snippets
console.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_console_log(session, arg_dict):
    try:
        raw_dom_id = arg_dict['dom_id']
    except KeyError:
        raise dom0_pluginlib.PluginError("Missing dom_id")
    try:
        dom_id = int(raw_dom_id)
    except ValueError:
        raise dom0_pluginlib.PluginError("Invalid dom_id")

    logfile = open(CONSOLE_LOG_FILE_PATTERN % dom_id, 'rb')
    try:
        try:
            log_content = _last_bytes(logfile)
        except IOError, e:  # noqa
            msg = "Error reading console: %s" % e
            logging.debug(msg)
            raise dom0_pluginlib.PluginError(msg)
    finally:
        logfile.close()

    return base64.b64encode(zlib.compress(log_content))
test_amf3.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_compressed(self):
        """
        ByteArrays can be compressed. Test the C{compressed} attribute for
        validity.
        """
        try:
            import zlib
        except ImportError:
            self.skipTest('zlib is missing')

        ba = amf3.ByteArray()

        self.assertFalse(ba.compressed)

        z = zlib.compress('b' * 100)
        ba = amf3.ByteArray(z)

        self.assertTrue(ba.compressed)

        z = zlib.compress('\x00' * 100)
        ba = amf3.ByteArray(z)

        self.assertTrue(ba.compressed)
filebased.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, -1))
                f.write(zlib.compress(pickle.dumps(value), -1))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
filebased.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL))
                f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
bslurp.py 文件源码 项目:kolla-kubernetes-personal 作者: rthallisey 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
bslurp.py 文件源码 项目:kolla-kubernetes-personal 作者: rthallisey 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
    argument_spec = dict(
        compress=dict(default=True, type='bool'),
        dest=dict(type='str'),
        mode=dict(default='0644', type='str'),
        sha1=dict(default=None, type='str'),
        src=dict(required=True, type='str')
    )
    module = AnsibleModule(argument_spec)

    dest = module.params.get('dest')

    try:
        if dest:
            copy_to_host(module)
        else:
            copy_from_host(module)
    except Exception as e:
        module.exit_json(failed=True, changed=True, msg=repr(e))


# import module snippets
tests.py 文件源码 项目:django-lrucache-backend 作者: kogan 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_binary_string(self):
        # Binary strings should be cacheable
        cache = self.cache
        from zlib import compress, decompress
        value = 'value_to_be_compressed'
        compressed_value = compress(value.encode())

        # Test set
        cache.set('binary1', compressed_value)
        compressed_result = cache.get('binary1')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test add
        cache.add('binary1-add', compressed_value)
        compressed_result = cache.get('binary1-add')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test set_many
        cache.set_many({'binary1-set_many': compressed_value})
        compressed_result = cache.get('binary1-set_many')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())
utilities.py 文件源码 项目:CoBL-public 作者: lingdb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def compressedField(field):
    # Decorator for compressed fields:

    def fget(self):
        data = getattr(self, field)
        if data is None:
            return None
        return zlib.decompress(data)

    def fset(self, value):
        setattr(self, field, zlib.compress(value.encode()))

    def fdel(self):
        delattr(self, field)
    return {'doc': "The compression property for %s." % field,
            'fget': fget,
            'fset': fset,
            'fdel': fdel}
filebased.py 文件源码 项目:Scrum 作者: prakharchoudhary 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL))
                f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
swf.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _open(self, fps=12, loop=True, html=False, compress=False): 
            if not _swf:
                load_lib()

            self._arg_fps = int(fps)
            self._arg_loop = bool(loop)
            self._arg_html = bool(html)
            self._arg_compress = bool(compress)

            self._fp = self.request.get_file()
            self._framecounter = 0
            self._framesize = (100, 100)

            # For compress, we use an in-memory file object
            if self._arg_compress:
                self._fp_real = self._fp
                self._fp = BytesIO()
gen_library_compressed_string.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_encoded_library_string(arch):
    filepath=None
    if arch=="x86":
        filepath=os.path.join("resources","libraryx86.zip")
    elif arch=="x64":
        filepath=os.path.join("resources","libraryx64.zip")
    else:
        raise Exception("unknown arch %s"%arch)
    f = StringIO.StringIO()
    f.write(open(filepath, "rb").read())

    zip = zipfile.ZipFile(f)

    modules = dict([(z.filename, zip.open(z.filename,).read()) for z in zip. infolist() if os.path.splitext(z.filename)[1] in [".py",".pyd",".dll",".pyc",".pyo"]])

    return zlib.compress(marshal.dumps(modules),9)
vchat.py 文件源码 项目:lan-ichat 作者: Forec 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self):
        print("VEDIO client starts...")
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("VEDIO client connected...")
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            sframe = cv2.resize(frame, (0,0), fx=self.fx, fy=self.fx)
            data = pickle.dumps(sframe)
            zdata = zlib.compress(data, zlib.Z_BEST_COMPRESSION)
            try:
                self.sock.sendall(struct.pack("L", len(zdata)) + zdata)
            except:
                break
            for i in range(self.interval):
                self.cap.read()
encode_bin.py 文件源码 项目:bpy_lambda 作者: bcongdon 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _add_array_helper(self, data, array_type, prop_type):
        assert(isinstance(data, array.array))
        assert(data.typecode == array_type)

        length = len(data)

        if _IS_BIG_ENDIAN:
            data = data[:]
            data.byteswap()
        data = data.tobytes()

        # mimic behavior of fbxconverter (also common sense)
        # we could make this configurable.
        encoding = 0 if len(data) <= 128 else 1
        if encoding == 0:
            pass
        elif encoding == 1:
            data = zlib.compress(data, 1)

        comp_len = len(data)

        data = pack('<3I', length, encoding, comp_len) + data

        self.props_type.append(prop_type)
        self.props.append(data)
filebased.py 文件源码 项目:django 作者: alexsukhrin 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL))
                f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
stream.py 文件源码 项目:Auspex 作者: BBN-Q 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def push(self, data):
        if hasattr(data, 'size'):
            self.points_taken += data.size
        else:
            try:
                self.points_taken += len(data)
            except:
                try:
                    junk = data + 1.0
                    self.points_taken += 1
                except:
                    raise ValueError("Got data {} that is neither an array nor a float".format(data))
        if self.compression == 'zlib':
            message = {"type": "data", "compression": "zlib", "data": zlib.compress(pickle.dumps(data, -1))}
        else:
            message = {"type": "data", "compression": "none", "data": data}

        # This can be replaced with some other serialization method
        # and also should support sending via zmq.
        await self.queue.put(message)
bslurp.py 文件源码 项目:kolla-ansible 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
bslurp.py 文件源码 项目:kolla-ansible 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
    argument_spec = dict(
        compress=dict(default=True, type='bool'),
        dest=dict(type='str'),
        mode=dict(default='0644', type='str'),
        sha1=dict(default=None, type='str'),
        src=dict(required=True, type='str')
    )
    module = AnsibleModule(argument_spec)

    dest = module.params.get('dest')

    try:
        if dest:
            copy_to_host(module)
        else:
            copy_from_host(module)
    except Exception:
        module.exit_json(failed=True, changed=True,
                         msg=repr(traceback.format_exc()))


# import module snippets
core.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, context, core_src):
        self._context = context
        self._present = {'mitogen': [
            'mitogen.ansible',
            'mitogen.compat',
            'mitogen.compat.pkgutil',
            'mitogen.fakessh',
            'mitogen.master',
            'mitogen.ssh',
            'mitogen.sudo',
            'mitogen.utils',
        ]}
        self.tls = threading.local()
        self._cache = {}
        if core_src:
            self._cache['mitogen.core'] = (
                None,
                'mitogen/core.py',
                zlib.compress(core_src),
            )
idapythonrc.py 文件源码 项目:codemap 作者: c0demap 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def SaveModuleBP():
    global codemap
    try:
        modname = AskStr('', 'module name : ')
        bpo = ''
        for e in Functions():
            func = e.startEA
            length = e.endEA - e.startEA
            if length < codemap.func_min_size:
                continue
            offset = func - get_imagebase()
            bpo += str(offset) + '\n'
        print 'bp offset generation complete! ' + str(len(bpo))
        payload = bpo
        with open(codemap.homedir + modname + '.bpo', 'wb') as f:
            f.write(zlib.compress(payload))
    except:
        traceback.print_exc(file=sys.stdout)
bslurp.py 文件源码 项目:contrail-ansible 作者: Juniper 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
bslurp.py 文件源码 项目:contrail-ansible 作者: Juniper 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def main():
    argument_spec = dict(
        compress=dict(default=True, type='bool'),
        dest=dict(type='str'),
        mode=dict(default='0644', type='str'),
        sha1=dict(default=None, type='str'),
        src=dict(required=True, type='str')
    )
    module = AnsibleModule(argument_spec)

    dest = module.params.get('dest')

    try:
        if dest:
            copy_to_host(module)
        else:
            copy_from_host(module)
    except Exception:
        module.exit_json(failed=True, changed=True,
                         msg=repr(traceback.format_exc()))


# import module snippets
codecs.py 文件源码 项目:pytoshop 作者: mdboom 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def compress_zip(fd,      # type: BinaryIO
                 image,   # type: np.ndarray
                 depth,   # type: int
                 version  # type: int
                 ):       # type: (...) -> None
    """
    Write a Numpy array to a zip (zlib) compressed stream.

{}
    """
    image = normalize_image(image, depth)
    if util.needs_byteswap(image):
        compressor = zlib.compressobj()
        for row in image:
            row = util.do_byteswap(row)
            fd.write(compressor.compress(row))
        fd.write(compressor.flush())
    else:
        fd.write(zlib.compress(image))
codecs.py 文件源码 项目:pytoshop 作者: mdboom 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def compress_constant_zip(fd,      # type: BinaryIO
                          value,   # type: int
                          width,   # type: int
                          rows,    # type: int
                          depth,   # type: int
                          version  # type: int
                          ):       # type: (...) -> None
    """
    Write a virtual image containing a constant to a zip compressed
    stream.

{}
    """
    if depth == 1:
        image = _make_onebit_constant(value, width, rows)
        compress_zip(fd, image, depth, version)
    else:
        row = _make_constant_row(value, width, depth)
        row = row.tobytes()
        fd.write(zlib.compress(row * rows))
base.py 文件源码 项目:Stitch 作者: nathanlopez 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def to_png(self, data, output):
        ''' Dump data to the image file. Data is bytes(RGBRGB...RGB).
            Pure python PNG implementation.
            http://inaps.org/journal/comment-fonctionne-le-png
        '''

        p__ = pack
        line = self.width * 3
        png_filter = p__('>B', 0)
        scanlines = b''.join(
            [png_filter + data[y * line:y * line + line]
             for y in range(self.height)])

        magic = p__('>8B', 137, 80, 78, 71, 13, 10, 26, 10)

        # Header: size, marker, data, CRC32
        ihdr = [b'', b'IHDR', b'', b'']
        ihdr[2] = p__('>2I5B', self.width, self.height, 8, 2, 0, 0, 0)
        ihdr[3] = p__('>I', crc32(b''.join(ihdr[1:3])) & 0xffffffff)
        ihdr[0] = p__('>I', len(ihdr[2]))

        # Data: size, marker, data, CRC32
        idat = [b'', b'IDAT', compress(scanlines), b'']
        idat[3] = p__('>I', crc32(b''.join(idat[1:3])) & 0xffffffff)
        idat[0] = p__('>I', len(idat[2]))

        # Footer: size, marker, None, CRC32
        iend = [b'', b'IEND', b'', b'']
        iend[3] = p__('>I', crc32(iend[1]) & 0xffffffff)
        iend[0] = p__('>I', len(iend[2]))

        with open(output, 'wb') as fileh:
            fileh.write(magic)
            fileh.write(b''.join(ihdr))
            fileh.write(b''.join(idat))
            fileh.write(b''.join(iend))
            return

        err = 'Error writing data to "{0}".'.format(output)
        raise ScreenshotError(err)
zlib_.py 文件源码 项目:ekko 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def compress(data):
        return zlib.compress(data)
fetch_ceph_keys.py 文件源码 项目:openstack-deploy 作者: yaoice 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def read_file(filename):
    filename_path = os.path.join('/etc/ceph', filename)

    if not os.path.exists(filename_path):
        json_exit("file not found: {}".format(filename_path), failed=True)
    if not os.access(filename_path, os.R_OK):
        json_exit("file not readable: {}".format(filename_path), failed=True)

    with open(filename_path, 'rb') as f:
        raw_data = f.read()

    return {'content': base64.b64encode(zlib.compress(raw_data)),
            'sha1': hashlib.sha1(raw_data).hexdigest(),
            'filename': filename}
bslurp.py 文件源码 项目:openstack-deploy 作者: yaoice 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def copy_to_host(module):
    compress = module.params.get('compress')
    dest = module.params.get('dest')
    mode = int(module.params.get('mode'), 0)
    sha1 = module.params.get('sha1')
    src = module.params.get('src')

    data = base64.b64decode(src)
    raw_data = zlib.decompress(data) if compress else data

    if sha1:
        if os.path.exists(dest):
            if os.access(dest, os.R_OK):
                with open(dest, 'rb') as f:
                    if hashlib.sha1(f.read()).hexdigest() == sha1:
                        module.exit_json(changed=False)
            else:
                module.exit_json(failed=True, changed=False,
                                 msg='file is not accessible: {}'.format(dest))

        if sha1 != hashlib.sha1(raw_data).hexdigest():
            module.exit_json(failed=True, changed=False,
                             msg='sha1 sum does not match data')

    with os.fdopen(os.open(dest, os.O_WRONLY | os.O_CREAT, mode), 'wb') as f:
        f.write(raw_data)

    module.exit_json(changed=True)
serialize.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def dumps(self, response, body=None):

        if body is None:
            body = response.read(decode_content=False)

            # NOTE: 99% sure this is dead code. I'm only leaving it
            #       here b/c I don't have a test yet to prove
            #       it. Basically, before using
            #       `cachecontrol.filewrapper.CallbackFileWrapper`,
            #       this made an effort to reset the file handle. The
            #       `CallbackFileWrapper` short circuits this code by
            #       setting the body as the content is consumed, the
            #       result being a `body` argument is *always* passed
            #       into cache_response, and in turn,
            #       `Serializer.dump`.
            response._fp = io.BytesIO(body)

        data = {
            "response": {
                "body": _b64_encode_bytes(body),
                "headers": dict(
                    (_b64_encode(k), _b64_encode(v))
                    for k, v in response.headers.items()
                ),
                "status": response.status,
                "version": response.version,
                "reason": _b64_encode_str(response.reason),
                "strict": response.strict,
                "decode_content": response.decode_content,
            },
        }

        return zlib.compress(
                json.dumps(
                    data, separators=(",", ":"), sort_keys=True,
                ).encode("utf8"),
                )


问题


面经


文章

微信
公众号

扫码关注公众号