def test_empty_zipfile(self):
# Check that creating a file in 'w' or 'a' mode and closing without
# adding any files to the archives creates a valid empty ZIP file
zipf = zipfile.ZipFile(TESTFN, mode="w")
zipf.close()
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
except zipfile.BadZipFile:
self.fail("Unable to create empty ZIP file in 'w' mode")
zipf = zipfile.ZipFile(TESTFN, mode="a")
zipf.close()
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
except:
self.fail("Unable to create empty ZIP file in 'a' mode")
python类BadZipfile()的实例源码
def test_read_with_bad_crc(self):
"""Tests that files with bad CRCs raise a BadZipFile exception when read."""
zipdata = self.zip_with_bad_crc
# Using ZipFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
# Using ZipExtFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
# Same with small reads (in order to exercise the buffering logic)
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
corrupt_file.MIN_READ_SIZE = 2
with self.assertRaises(zipfile.BadZipFile):
while corrupt_file.read(2):
pass
def test_empty_zipfile(self):
# Check that creating a file in 'w' or 'a' mode and closing without
# adding any files to the archives creates a valid empty ZIP file
zipf = zipfile.ZipFile(TESTFN, mode="w")
zipf.close()
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
except zipfile.BadZipFile:
self.fail("Unable to create empty ZIP file in 'w' mode")
zipf = zipfile.ZipFile(TESTFN, mode="a")
zipf.close()
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
except:
self.fail("Unable to create empty ZIP file in 'a' mode")
def test_read_with_bad_crc(self):
"""Tests that files with bad CRCs raise a BadZipFile exception when read."""
zipdata = self.zip_with_bad_crc
# Using ZipFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
# Using ZipExtFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
# Same with small reads (in order to exercise the buffering logic)
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
corrupt_file.MIN_READ_SIZE = 2
with self.assertRaises(zipfile.BadZipFile):
while corrupt_file.read(2):
pass
def _unzip(files, destination):
size = len(files)
destination.mkdir(exist_ok=True)
LOGGER.debug("Unzip %d files into %s", size, destination)
for pos, zipped_file in enumerate(files, 1):
if pos % STEP == 0 or pos == size:
LOGGER.debug("%.2f%%", 100 * pos / size)
zipped_repo, filename = zipped_file.split('::')
ext = Path(filename).suffix
dest_path = destination.joinpath(str(uuid4()) + ext)
try:
with ZipFile(zipped_repo) as zip_file:
with dest_path.open('wb') as dest_file:
dest_file.write(zip_file.read(filename))
except BadZipFile as error:
LOGGER.error("Malformed file %s, error: %s", zipped_repo, error)
raise RuntimeError('Extraction failed for {}'.format(zipped_repo))
def get_file_type(f_path):
"""
Get the Blib type of a file.
Args:
f_path (str): Path to the file to be checked.
Returns:
str or None
A string containing the Blib type is returned,
if no valid type is found, None is returned.
"""
try:
archive = zf.ZipFile(f_path, 'r')
except zf.BadZipFile:
return None
try:
blib_type = archive.comment.decode("utf-8").split(" ")[1]
except ValueError:
return None
archive.close()
return blib_type
def zipGetFileSize(content):
content = content.split()
if len(content) > 1:
try:
zipFile = zipfile.ZipFile(content[1])
infoOfFile = zipFile.getinfo(content[0])
zipFile.close()
return [str(infoOfFile.file_size) + ' KB']
except FileNotFoundError:
return ['1f401268File %r Not Found' %(content[1])]
except zipfile.BadZipFile:
return ['1f401268File %r is not a zip file' %(content[1])]
except:
return ['Something went wrong when trying to handel file %r' %(content[1])]
else:
return ['zip getfilesize item file_zip']
def zipGetComSize(content):
content = content.split()
if len(content) > 1:
try:
zipFile = zipfile.ZipFile(content[1])
infoOfFile = zipFile.getinfo(content[0])
zipFile.close()
return [str(infoOfFile.compress_size) + ' KB']
except FileNotFoundError:
return ['1f401268File %r Not Found' %(content[1])]
except zipfile.BadZipFile:
return ['1f401268File %r is not a zip file' %(content[1])]
except KeyError:
return ['1f401268There no item named %r in the archive' %(content[0])]
except:
return ['Something went wrong when trying to handel file %r' %(content[1])]
else:
return ['zip getfilesize item file_zip']
def unzipAll(content):
content = content.split()
if len(content) == 2:
extractTo = content[1]
else:
extractTo = 0
try:
zipFile = zipfile.ZipFile(content[0])
if extractTo == 0:
zipFile.extractall()
else:
zipFile.extractall(r'{}'.format(content[1]))
zipFile.close()
return []
except FileNotFoundError:
return ['1f401268File %r Not Found' %(content[0])]
except zipfile.BadZipFile:
return ['1f401268File %r is not a zip file' %(content[0])]
except:
return ['Something went wrong when trying to unzip file %r' %(content[0])]
def unzip(content):
content = content.split()
if len(content) < 2:
return unzipAll(''.join(content))
else:
if len(content) == 3:
extractTo = content[-1]
else:
extractTo = 0
try:
zipFile = zipfile.ZipFile(content[1])
if extractTo == 0:
zipFile.extract(content[0])
else:
zipFile.extract(content[0], extractTo)
zipFile.close()
return []
except FileNotFoundError:
return ['1f401268File %r Not Found' %(content[1])]
except zipfile.BadZipFile:
return ['1f401268File %r is not a zip file' %(content[1])]
except KeyError:
return ['1f401268There no item named %r in the archive' %(content[0])]
except:
return ['Something went wrong when trying to unzip file %r' %(content[1])]
def update_compressed_packages(pkgs):
multi_importer.loaders = []
for p in pkgs:
try:
multi_importer.loaders.append(ZipLoader(p))
except (FileNotFoundError, zipfile.BadZipFile) as e:
print("error loading " + p + ": " + str(e))
def test_bad_zip_file(self):
mis = Miz(BAD_ZIP_FILE)
with pytest.raises(BadZipFile):
mis.unzip()
def try_open_zip(file):
try:
zip_file = ZipFile(file)
except BadZipFile:
return None
canonical_dict = dict((name.lower(), name)
for name in zip_file.namelist())
def open(name):
try:
return zip_file.open(canonical_dict[name.lower()])
except KeyError:
raise FileNotFoundError(name) from None
return open
def filecount_in_zip(filepath: str) -> int:
try:
my_zip = zipfile.ZipFile(filepath, 'r')
except zipfile.BadZipFile:
return 0
filtered_files = list(filter(discard_zipfile_contents, sorted(my_zip.namelist(), key=zfill_to_three)))
my_zip.close()
return len(filtered_files)
def generate_image_set(self, force: bool=False) -> None:
if not os.path.isfile(self.zipped.path):
return
image_set_present = bool(self.image_set.all())
# large thumbnail and image set
if not image_set_present or force:
try:
my_zip = zipfile.ZipFile(
self.zipped.path, 'r')
except (zipfile.BadZipFile, NotImplementedError):
return
if my_zip.testzip():
my_zip.close()
return
filtered_files = list(filter(discard_zipfile_contents, sorted(my_zip.namelist(), key=zfill_to_three)))
for img in self.image_set.all():
if img.extracted:
img.image.delete(save=False)
img.thumbnail.delete(save=False)
img.delete()
for count, filename in enumerate(filtered_files, start=1):
image = Image(archive=self, archive_position=count, position=count)
image.image = None
image.save()
my_zip.close()
def generate_thumbnails(self) -> bool:
if not os.path.exists(self.zipped.path):
return False
try:
my_zip = zipfile.ZipFile(self.zipped.path, 'r')
except (zipfile.BadZipFile, NotImplementedError):
return False
if my_zip.testzip():
my_zip.close()
return False
self.thumbnail.delete(save=False)
filtered_files = list(filter(discard_zipfile_contents, sorted(my_zip.namelist(), key=zfill_to_three)))
if not filtered_files:
my_zip.close()
return False
if self.image_set.all():
first_file = filtered_files[self.image_set.all()[0].archive_position - 1]
else:
first_file = filtered_files[0]
with my_zip.open(first_file) as current_img:
im = PImage.open(current_img)
if im.mode != 'RGB':
im = im.convert('RGB')
im.thumbnail((200, 290), PImage.ANTIALIAS)
thumb_name = thumb_path_handler(self, "thumb2.jpg")
os.makedirs(os.path.dirname(pjoin(settings.MEDIA_ROOT, thumb_name)), exist_ok=True)
im.save(pjoin(settings.MEDIA_ROOT, thumb_name), "JPEG")
self.thumbnail.name = thumb_name
my_zip.close()
super(Archive, self).save()
return True
def extract(path, output_dir):
import zipfile
import qiime2.sdk
try:
extracted_dir = qiime2.sdk.Result.extract(path, output_dir)
except (zipfile.BadZipFile, ValueError):
raise click.BadParameter(
'%s is not a valid QIIME 2 Result. Only QIIME 2 Artifacts and '
'Visualizations can be extracted.' % path)
else:
click.echo('Extracted to %s' % extracted_dir)
def upload_tweet_archive():
ta = TwitterArchive(
account=g.viewer.account,
body=request.files['file'].read())
db.session.add(ta)
db.session.commit()
try:
files = libforget.twitter.chunk_twitter_archive(ta.id)
ta.chunks = len(files)
db.session.commit()
if not ta.chunks > 0:
raise TweetArchiveEmptyException()
for filename in files:
tasks.import_twitter_archive_month.s(ta.id, filename).apply_async()
return redirect(url_for('index', _anchor='recent_archives'))
except (BadZipFile, TweetArchiveEmptyException):
if sentry:
sentry.captureException()
return redirect(
url_for('index', tweet_archive_failed='',
_anchor='tweet_archive_import'))
def run(self):
try:
zipf = zipfile.ZipFile(variables["file"][0])
except FileNotFoundError:
self.pwdh.error = "zip file not found"
return
for word in self.words:
if self.pwdh.pwd != None:
return
elif self.pwdh.error != None:
return
elif self.pwdh.kill == True:
return
try:
word = word.decode("utf-8").replace("\n", "")
if word[0] == "#":
continue
#animline("trying password: "+word)
zipf.extractall(variables["exto"][0], pwd=word.encode("utf-8"))
self.pwdh.pwd = word
return
except RuntimeError:
pass
except zipfile.BadZipFile:
pass
def test_close_on_exception(self):
"""Check that the zipfile is closed if an exception is raised in the
'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
try:
with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
raise zipfile.BadZipFile()
except zipfile.BadZipFile:
self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
def test_close_erroneous_file(self):
# This test checks that the ZipFile constructor closes the file object
# it opens if there's an error in the file. If it doesn't, the
# traceback holds a reference to the ZipFile object and, indirectly,
# the file object.
# On Windows, this causes the os.unlink() call to fail because the
# underlying file is still open. This is SF bug #412214.
#
with open(TESTFN, "w") as fp:
fp.write("this is not a legal zip file\n")
try:
zf = zipfile.ZipFile(TESTFN)
except zipfile.BadZipFile:
pass
def test_empty_file_raises_BadZipFile(self):
f = open(TESTFN, 'w')
f.close()
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
with open(TESTFN, 'w') as fp:
fp.write("short file")
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
def zip_namelist(f):
try:
return zipfile.ZipFile(f).namelist()
except zipfile.BadZipFile:
return []
def extract_manifest_of_file(crx_file):
debug = False
if os.path.isfile(crx_file):
size = os.path.getsize(crx_file)
if size == 0:
print('###################!!!! fucking empty downloads', crx_file)
return
try:
with ZipFile(crx_file) as myzip:
if debug:
# """ #save file to debug later
source = myzip.open('manifest.json')
target = open('lol.json', "wb")
with source, target:
shutil.copyfileobj(source, target)
#"""
with myzip.open('manifest.json') as myfile:
text = myfile.read()
#if debug: print(text)
try:
text = text.decode('utf-8-sig')
except:
text = text.decode('latin-1')
if debug: print(text)
try:
return json.loads(text, strict=False)
except Exception as e:
print(e)
text = uncomment(text)
return json.loads(text, strict=False)
except BadZipFile:
print('fucking interrupted downloads')
else:
print(crx_file,'does not exist')
#TODO: add filesize
def extract(self, filename:str, password:str):
"""
POST /admin/extract
"""
if os.path.exists(Config.contest_path):
self.raise_exc(Forbidden, "CONTEST", "Contest already loaded")
os.makedirs(Config.contest_path)
wd = os.getcwd()
z = os.path.abspath(os.path.join(Config.contest_zips, filename))
os.chdir(Config.contest_path)
try:
with zipfile.ZipFile(z) as f:
f.extractall(pwd=password.encode())
Logger.info("CONTEST", "Contest extracted")
except FileNotFoundError:
BaseHandler.raise_exc(NotFound, "NOT_FOUND", "Archive %s not found" % z)
except RuntimeError as ex:
BaseHandler.raise_exc(Forbidden, "FAILED", str(ex))
except PermissionError as ex:
BaseHandler.raise_exc(Forbidden, "FAILED", str(ex))
except zipfile.BadZipFile as ex:
BaseHandler.raise_exc(Forbidden, "FAILED", str(ex))
finally:
os.chdir(wd)
ContestManager.read_from_disk()
return {}
def test_close_on_exception(self):
"""Check that the zipfile is closed if an exception is raised in the
'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
try:
with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
raise zipfile.BadZipFile()
except zipfile.BadZipFile:
self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
def test_close_erroneous_file(self):
# This test checks that the ZipFile constructor closes the file object
# it opens if there's an error in the file. If it doesn't, the
# traceback holds a reference to the ZipFile object and, indirectly,
# the file object.
# On Windows, this causes the os.unlink() call to fail because the
# underlying file is still open. This is SF bug #412214.
#
with open(TESTFN, "w") as fp:
fp.write("this is not a legal zip file\n")
try:
zf = zipfile.ZipFile(TESTFN)
except zipfile.BadZipFile:
pass
def test_damaged_zipfile(self):
"""Check that zipfiles with missing bytes at the end raise BadZipFile."""
# - Create a valid zip file
fp = io.BytesIO()
with zipfile.ZipFile(fp, mode="w") as zipf:
zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
zipfiledata = fp.getvalue()
# - Now create copies of it missing the last N bytes and make sure
# a BadZipFile exception is raised when we try to open it
for N in range(len(zipfiledata)):
fp = io.BytesIO(zipfiledata[:N])
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
def test_empty_file_raises_BadZipFile(self):
f = open(TESTFN, 'w')
f.close()
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
with open(TESTFN, 'w') as fp:
fp.write("short file")
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
def update_compressed_packages(pkgs):
multi_importer.loaders = []
for p in pkgs:
try:
multi_importer.loaders.append(ZipLoader(p))
except (FileNotFoundError, zipfile.BadZipFile) as e:
print("error loading " + p + ": " + str(e))