def get(self, key):
info = self._names_to_info[key]
with self._lock:
try:
return self.file.read(info)
except zipfile.BadZipFile:
# Try re-opening the file
self._open_file()
return self.file.read(info)
python类BadZipfile()的实例源码
def test_close_on_exception(self):
"""Check that the zipfile is closed if an exception is raised in the
'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
try:
with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
raise zipfile.BadZipFile()
except zipfile.BadZipFile:
self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
def test_close_erroneous_file(self):
# This test checks that the ZipFile constructor closes the file object
# it opens if there's an error in the file. If it doesn't, the
# traceback holds a reference to the ZipFile object and, indirectly,
# the file object.
# On Windows, this causes the os.unlink() call to fail because the
# underlying file is still open. This is SF bug #412214.
#
with open(TESTFN, "w") as fp:
fp.write("this is not a legal zip file\n")
try:
zf = zipfile.ZipFile(TESTFN)
except zipfile.BadZipFile:
pass
def test_damaged_zipfile(self):
"""Check that zipfiles with missing bytes at the end raise BadZipFile."""
# - Create a valid zip file
fp = io.BytesIO()
with zipfile.ZipFile(fp, mode="w") as zipf:
zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
zipfiledata = fp.getvalue()
# - Now create copies of it missing the last N bytes and make sure
# a BadZipFile exception is raised when we try to open it
for N in range(len(zipfiledata)):
fp = io.BytesIO(zipfiledata[:N])
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
def test_empty_file_raises_BadZipFile(self):
f = open(TESTFN, 'w')
f.close()
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
with open(TESTFN, 'w') as fp:
fp.write("short file")
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
def test_close_on_exception(self):
"""Check that the zipfile is closed if an exception is raised in the
'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
try:
with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
raise zipfile.BadZipFile()
except zipfile.BadZipFile:
self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
def test_close_erroneous_file(self):
# This test checks that the ZipFile constructor closes the file object
# it opens if there's an error in the file. If it doesn't, the
# traceback holds a reference to the ZipFile object and, indirectly,
# the file object.
# On Windows, this causes the os.unlink() call to fail because the
# underlying file is still open. This is SF bug #412214.
#
with open(TESTFN, "w") as fp:
fp.write("this is not a legal zip file\n")
try:
zf = zipfile.ZipFile(TESTFN)
except zipfile.BadZipFile:
pass
def test_damaged_zipfile(self):
"""Check that zipfiles with missing bytes at the end raise BadZipFile."""
# - Create a valid zip file
fp = io.BytesIO()
with zipfile.ZipFile(fp, mode="w") as zipf:
zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
zipfiledata = fp.getvalue()
# - Now create copies of it missing the last N bytes and make sure
# a BadZipFile exception is raised when we try to open it
for N in range(len(zipfiledata)):
fp = io.BytesIO(zipfiledata[:N])
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
def test_empty_file_raises_BadZipFile(self):
f = open(TESTFN, 'w')
f.close()
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
with open(TESTFN, 'w') as fp:
fp.write("short file")
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
def _list_files(repos, languages, remove):
repo_files = {lang: [] for lang in languages}
exts = {ext: lang for lang, exts in languages.items() for ext in exts}
for pos, zipped_repo in enumerate(repos, 1):
current_repo_files = {lang: [] for lang in languages}
size = len(repos)
if pos % STEP == 0 or pos == size:
LOGGER.debug("%.2f%%", 100 * pos / size)
try:
with ZipFile(zipped_repo) as zip_file:
for filename in zip_file.namelist():
lang = exts.get(Path(filename).suffix.lstrip('.'))
if not lang or len(current_repo_files[lang]) >= MAX_FILES:
continue
current_repo_files[lang].append(
'{}::{}'.format(zipped_repo, filename))
except BadZipFile as error:
LOGGER.warning("Malformed file %s, error: %s", zipped_repo, error)
if remove:
Path(zipped_repo).unlink()
LOGGER.debug("%s removed", zipped_repo)
continue
for lang, zipped_files in current_repo_files.items():
repo_files[lang].extend(zipped_files)
return repo_files