python类BZ2Decompressor()的实例源码

cache.py 文件源码 项目:conda-tools 作者: groutr 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _decompress_bz2(filename, blocksize=900*1024):
    """
    Decompress .tar.bz2 to .tar on disk (for faster access)

    Use TemporaryFile to guarentee write access.
    """
    if not filename.endswith('.tar.bz2'):
        return filename

    fd, path = mkstemp()
    with os.fdopen(fd, 'wb') as fo:
        with open(filename, 'rb') as fi:
            z = bz2.BZ2Decompressor()

            for block in iter(lambda: fi.read(blocksize), b''):
                fo.write(z.decompress(block))
    return path
rpm.py 文件源码 项目:crypto-detector 作者: Wind-River 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def init(self):
        import lzma
        self.pos = 0
        if self.mode == "r":
            self.cmpobj = lzma.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = ""
        else:
            self.cmpobj = lzma.BZ2Compressor()

# class _XZProxy


#------------------------
# Extraction file object
#------------------------
test_tarfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_stream_padding(self):
        # Test for bug #1543303.
        tar = tarfile.open(tmpname, self.mode)
        tar.close()

        if self.mode.endswith("gz"):
            with gzip.GzipFile(tmpname) as fobj:
                data = fobj.read()
        elif self.mode.endswith("bz2"):
            dec = bz2.BZ2Decompressor()
            with open(tmpname, "rb") as fobj:
                data = fobj.read()
            data = dec.decompress(data)
            self.assertTrue(len(dec.unused_data) == 0,
                    "found trailing data")
        else:
            with open(tmpname, "rb") as fobj:
                data = fobj.read()

        self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE,
                         "incorrect zero padding")
test_tarfile.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_stream_padding(self):
        # Test for bug #1543303.
        tar = tarfile.open(tmpname, self.mode)
        tar.close()

        if self.mode.endswith("gz"):
            with gzip.GzipFile(tmpname) as fobj:
                data = fobj.read()
        elif self.mode.endswith("bz2"):
            dec = bz2.BZ2Decompressor()
            with open(tmpname, "rb") as fobj:
                data = fobj.read()
            data = dec.decompress(data)
            self.assertTrue(len(dec.unused_data) == 0,
                    "found trailing data")
        else:
            with open(tmpname, "rb") as fobj:
                data = fobj.read()

        self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
                         "incorrect zero padding")
test_tarfile.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_stream_padding(self):
        # Test for bug #1543303.
        tar = tarfile.open(tmpname, self.mode)
        tar.close()

        if self.mode.endswith("gz"):
            with gzip.GzipFile(tmpname) as fobj:
                data = fobj.read()
        elif self.mode.endswith("bz2"):
            dec = bz2.BZ2Decompressor()
            with open(tmpname, "rb") as fobj:
                data = fobj.read()
            data = dec.decompress(data)
            self.assertTrue(len(dec.unused_data) == 0,
                    "found trailing data")
        else:
            with open(tmpname, "rb") as fobj:
                data = fobj.read()

        self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
                         "incorrect zero padding")
pydc_client.py 文件源码 项目:recobot 作者: h4ck3rk3y 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def bz2_compress(self,file,type=True): # Compress/Decompress files into/from the bz2 format. compress if type else decompess
        if not os.path.exists(file) or os.path.isdir(file): return False
        try: filesize = os.path.getsize(file)
        except: return False
        if not type and not file.endswith(".bz2"): return False
        blocksize = 102400
        if type: compressor = bz2.BZ2Compressor()
        else: decompressor = bz2.BZ2Decompressor()
        handle1 = open(file,"rb")
        handle2 = open(file+".bz2","wb") if type else open(file[:-4],"wb")
        for i in range(int(math.ceil(float(filesize)/blocksize))):
            if type: handle2.write(compressor.compress(handle1.read(blocksize)))
            else: handle2.write(decompressor.decompress(handle1.read(blocksize)))
        if type: handle2.write(compressor.flush())
        handle1.close(); handle2.close()
        self.debug("Successfully "+("" if type else "de")+"compressed file : "+file)
        return True

    ################################################## Client Behaviour Functions ##################################################
test_tarfile.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_stream_padding(self):
        # Test for bug #1543303.
        tar = tarfile.open(tmpname, self.mode)
        tar.close()

        if self.mode.endswith("gz"):
            fobj = gzip.GzipFile(tmpname)
            data = fobj.read()
            fobj.close()
        elif self.mode.endswith("bz2"):
            dec = bz2.BZ2Decompressor()
            with open(tmpname, "rb") as fobj:
                data = fobj.read()
            data = dec.decompress(data)
            self.assertTrue(len(dec.unused_data) == 0,
                    "found trailing data")
        else:
            fobj = open(tmpname, "rb")
            data = fobj.read()
            fobj.close()

        self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
                         "incorrect zero padding")
test_tarfile.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_stream_padding(self):
        # Test for bug #1543303.
        tar = tarfile.open(tmpname, self.mode)
        tar.close()

        if self.mode.endswith("gz"):
            fobj = gzip.GzipFile(tmpname)
            data = fobj.read()
            fobj.close()
        elif self.mode.endswith("bz2"):
            dec = bz2.BZ2Decompressor()
            data = open(tmpname, "rb").read()
            data = dec.decompress(data)
            self.assertTrue(len(dec.unused_data) == 0,
                    "found trailing data")
        else:
            fobj = open(tmpname, "rb")
            data = fobj.read()
            fobj.close()

        self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
                         "incorrect zero padding")
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = ""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
recipe-579075.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def decompress(chunks, compression):
    """Decompress
    :param __generator[bytes] chunks: compressed body chunks.
    :param str compression: compression constant.
    :rtype: __generator[bytes]
    :return: decompressed chunks.
    :raise: TypeError, DecompressError
    """

    if compression not in SUPPORTED_COMPRESSIONS:
        raise TypeError('Unsupported compression type: %s' % (compression,))

    try:
        de_compressor = DECOMPRESSOR_FACTORIES[compression]()

        for chunk in chunks:
            try:
                yield de_compressor.decompress(chunk)
            except OSError as err:
                # BZ2Decompressor: invalid data stream
                raise DecompressError(err) from None

        # BZ2Decompressor does not support flush() interface.
        if hasattr(de_compressor, 'flush'):
            yield de_compressor.flush()

    except zlib.error as err:
        raise DecompressError(err) from None
tarfile.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:jira_worklog_scanner 作者: pgarneau 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
phishtankbot.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, fileobj):
        self._fileobj = fileobj
        self._bz2 = bz2.BZ2Decompressor()

        self._line_buffer = collections.deque([""])

        self._current_line = ""
        self._current_offset = 0
tarfile.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:ascii-art-py 作者: blinglnav 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:ivaochdoc 作者: ivaoch 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
tarfile.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()
rpm.py 文件源码 项目:crypto-detector 作者: Wind-River 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.cmpobj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = ""
        else:
            self.cmpobj = bz2.BZ2Compressor()

# class _BZ2Proxy
tarfile.py 文件源码 项目:django 作者: alexsukhrin 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def init(self):
        import bz2
        self.pos = 0
        if self.mode == "r":
            self.bz2obj = bz2.BZ2Decompressor()
            self.fileobj.seek(0)
            self.buf = b""
        else:
            self.bz2obj = bz2.BZ2Compressor()


问题


面经


文章

微信
公众号

扫码关注公众号