python类BadZipfile()的实例源码

zipfile.py 文件源码 项目:pencroft 作者: lukeyeager 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get(self, key):
        info = self._names_to_info[key]
        with self._lock:
            try:
                return self.file.read(info)
            except zipfile.BadZipFile:
                # Try re-opening the file
                self._open_file()
                return self.file.read(info)
test_zipfile.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_close_on_exception(self):
        """Check that the zipfile is closed if an exception is raised in the
        'with' block."""
        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
            for fpath, fdata in SMALL_TEST_DATA:
                zipfp.writestr(fpath, fdata)

        try:
            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
                raise zipfile.BadZipFile()
        except zipfile.BadZipFile:
            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
test_zipfile.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_close_erroneous_file(self):
        # This test checks that the ZipFile constructor closes the file object
        # it opens if there's an error in the file.  If it doesn't, the
        # traceback holds a reference to the ZipFile object and, indirectly,
        # the file object.
        # On Windows, this causes the os.unlink() call to fail because the
        # underlying file is still open.  This is SF bug #412214.
        #
        with open(TESTFN, "w") as fp:
            fp.write("this is not a legal zip file\n")
        try:
            zf = zipfile.ZipFile(TESTFN)
        except zipfile.BadZipFile:
            pass
test_zipfile.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_damaged_zipfile(self):
        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
        # - Create a valid zip file
        fp = io.BytesIO()
        with zipfile.ZipFile(fp, mode="w") as zipf:
            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
        zipfiledata = fp.getvalue()

        # - Now create copies of it missing the last N bytes and make sure
        #   a BadZipFile exception is raised when we try to open it
        for N in range(len(zipfiledata)):
            fp = io.BytesIO(zipfiledata[:N])
            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
test_zipfile.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_empty_file_raises_BadZipFile(self):
        f = open(TESTFN, 'w')
        f.close()
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)

        with open(TESTFN, 'w') as fp:
            fp.write("short file")
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
test_zipfile.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_close_on_exception(self):
        """Check that the zipfile is closed if an exception is raised in the
        'with' block."""
        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
            for fpath, fdata in SMALL_TEST_DATA:
                zipfp.writestr(fpath, fdata)

        try:
            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
                raise zipfile.BadZipFile()
        except zipfile.BadZipFile:
            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
test_zipfile.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_close_erroneous_file(self):
        # This test checks that the ZipFile constructor closes the file object
        # it opens if there's an error in the file.  If it doesn't, the
        # traceback holds a reference to the ZipFile object and, indirectly,
        # the file object.
        # On Windows, this causes the os.unlink() call to fail because the
        # underlying file is still open.  This is SF bug #412214.
        #
        with open(TESTFN, "w") as fp:
            fp.write("this is not a legal zip file\n")
        try:
            zf = zipfile.ZipFile(TESTFN)
        except zipfile.BadZipFile:
            pass
test_zipfile.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_damaged_zipfile(self):
        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
        # - Create a valid zip file
        fp = io.BytesIO()
        with zipfile.ZipFile(fp, mode="w") as zipf:
            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
        zipfiledata = fp.getvalue()

        # - Now create copies of it missing the last N bytes and make sure
        #   a BadZipFile exception is raised when we try to open it
        for N in range(len(zipfiledata)):
            fp = io.BytesIO(zipfiledata[:N])
            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
test_zipfile.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_empty_file_raises_BadZipFile(self):
        f = open(TESTFN, 'w')
        f.close()
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)

        with open(TESTFN, 'w') as fp:
            fp.write("short file")
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
unzip_repos.py 文件源码 项目:guesslang 作者: yoeo 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def _list_files(repos, languages, remove):
    repo_files = {lang: [] for lang in languages}
    exts = {ext: lang for lang, exts in languages.items() for ext in exts}

    for pos, zipped_repo in enumerate(repos, 1):
        current_repo_files = {lang: [] for lang in languages}
        size = len(repos)
        if pos % STEP == 0 or pos == size:
            LOGGER.debug("%.2f%%", 100 * pos / size)

        try:
            with ZipFile(zipped_repo) as zip_file:
                for filename in zip_file.namelist():
                    lang = exts.get(Path(filename).suffix.lstrip('.'))
                    if not lang or len(current_repo_files[lang]) >= MAX_FILES:
                        continue

                    current_repo_files[lang].append(
                        '{}::{}'.format(zipped_repo, filename))
        except BadZipFile as error:
            LOGGER.warning("Malformed file %s, error: %s", zipped_repo, error)
            if remove:
                Path(zipped_repo).unlink()
                LOGGER.debug("%s removed", zipped_repo)
            continue

        for lang, zipped_files in current_repo_files.items():
            repo_files[lang].extend(zipped_files)

    return repo_files


问题


面经


文章

微信
公众号

扫码关注公众号