def read(self, name):
"""Return file bytes (as a string) for name."""
f = self.readfile(name)
zinfo = self.getinfo(name)
bytes = f.read()
crc = binascii.crc32(bytes)
if crc != zinfo.CRC:
raise zipfile.BadZipfile, "Bad CRC-32 for file %s" % name
return bytes
python类BadZipfile()的实例源码
def unzip(f):
if os.path.isfile(f):
try:
with zipfile.ZipFile(f) as zf:
zf.extractall('.')
return 'File {} extracted.'.format(f)
except zipfile.BadZipfile:
return 'Error: Failed to unzip file.'
else:
return 'Error: File not found.'
def unzip(f):
if os.path.isfile(f):
try:
with zipfile.ZipFile(f) as zf:
zf.extractall('.')
return '[*] File {} extracted.'.format(f)
except zipfile.BadZipfile:
return '[!] Error: Failed to unzip file.'
else:
return '[-] Error: File not found.'
def unzip(f):
if os.path.isfile(f):
try:
with zipfile.ZipFile(f) as zf:
zf.extractall('.')
return 'File {} extracted.'.format(f)
except zipfile.BadZipfile:
return 'Error: Failed to unzip file.'
else:
return 'Error: File not found.'
def archive_context(filename):
"""
Unzip filename to a temporary directory, set to the cwd.
The unzipped target is cleaned up after.
"""
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
try:
with ContextualZipFile(filename) as archive:
archive.extractall()
except zipfile.BadZipfile as err:
if not err.args:
err.args = ('', )
err.args = err.args + (
MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
)
raise
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def archive_context(filename):
"""
Unzip filename to a temporary directory, set to the cwd.
The unzipped target is cleaned up after.
"""
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
try:
with ContextualZipFile(filename) as archive:
archive.extractall()
except zipfile.BadZipfile as err:
if not err.args:
err.args = ('', )
err.args = err.args + (
MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
)
raise
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def archive_context(filename):
"""
Unzip filename to a temporary directory, set to the cwd.
The unzipped target is cleaned up after.
"""
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
try:
with ContextualZipFile(filename) as archive:
archive.extractall()
except zipfile.BadZipfile as err:
if not err.args:
err.args = ('', )
err.args = err.args + (
MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
)
raise
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def archive_context(filename):
"""
Unzip filename to a temporary directory, set to the cwd.
The unzipped target is cleaned up after.
"""
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
try:
with ContextualZipFile(filename) as archive:
archive.extractall()
except zipfile.BadZipfile as err:
if not err.args:
err.args = ('', )
err.args = err.args + (
MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
)
raise
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def _should_contain_zip_content(value):
if not isinstance(value, bytes):
# If it's not bytes it's basically impossible for
# this to be valid zip content, but we'll at least
# still try to load the contents as a zip file
# to be absolutely sure.
value = value.encode('utf-8')
fileobj = six.BytesIO(value)
try:
with closing(zipfile.ZipFile(fileobj)) as f:
f.infolist()
except zipfile.BadZipfile:
raise ValueError(ERROR_MSG)
def _extract_zip(self, file_obj, path):
try:
with zipfile.ZipFile(file_obj) as archive:
archive.extractall(path)
except zipfile.BadZipfile:
raise self.BadArchive
def __init__(self):
"""
Initialize a new TUI session.
* Load server list from a configuration file under working directory.
* Try to load the hosts data file under working directory if it
exists.
.. note:: IF hosts data file does not exists correctly in current
working directory, a warning message box would popup. And
operations to change the hosts file on current system could be
done only until a new data file has been downloaded.
.. seealso:: :meth:`~tui.curses_d.CursesDaemon.session_daemon` method
in :class:`~tui.curses_d.CursesDaemon`.
.. seealso:: :meth:`~gui.hostsutil.HostsUtil.init_main` in
:class:`~gui.hostsutil.HostsUtil` class.
"""
super(HostsUtil, self).__init__()
# Set mirrors
self.settings[0][2] = CommonUtil.set_network("network.conf")
# Read data file and set function list
try:
self.set_platform()
RetrieveData.unpack()
RetrieveData.connect_db()
self.set_info()
self.set_func_list()
except IOError:
self.messagebox("No data file found! Press F6 to get data file "
"first.", 1)
except BadZipfile:
self.messagebox("Incorrect Data file! Press F6 to get a new data "
"file first.", 1)
def refresh_info(self, refresh=0):
"""
Reload the data file information and show them on the main dialog. The
information here includes both metadata and hosts module info from the
data file.
:param refresh: A flag indicating whether the information on main
dialog needs to be reloaded or not. The value could be `0` or `1`.
======= =============
refresh operation
======= =============
0 Do NOT reload
1 Reload
======= =============
:type refresh: int
"""
if refresh and RetrieveData.conn is not None:
RetrieveData.clear()
try:
RetrieveData.unpack()
RetrieveData.connect_db()
self.set_func_list(refresh)
self.refresh_func_list()
self.set_info()
except (BadZipfile, IOError, OSError):
self.warning_incorrect_datafile()
def init_main(self):
"""
Set up the elements on the main dialog. Check the environment of
current operating system and current session.
* Load server list from a configuration file under working directory.
* Try to load the hosts data file under working directory if it
exists.
.. note:: IF hosts data file does not exists correctly in current
working directory, a warning message box would popup. And
operations to change the hosts file on current system could be
done only until a new data file has been downloaded.
.. seealso:: Method :meth:`~tui.hostsutil.HostsUtil.__init__` in
:class:`~tui.hostsutil.HostsUtil` class.
"""
self.ui.SelectMirror.clear()
self.set_version()
# Set mirrors
self.mirrors = CommonUtil.set_network("network.conf")
self.set_mirrors()
# Read data file and set function list
try:
RetrieveData.unpack()
RetrieveData.connect_db()
self.set_func_list(1)
self.refresh_func_list()
self.set_info()
except IOError:
self.warning_no_datafile()
except BadZipfile:
self.warning_incorrect_datafile()
# Check if current session have root privileges
self.check_writable()
self.init_flag += 1
def files_in_archive(self, force_refresh=False):
if self._files_in_archive and not force_refresh:
return self._files_in_archive
self._files_in_archive = []
try:
archive = ZipFile(self.src)
except BadZipfile:
e = get_exception()
if e.args[0].lower().startswith('bad magic number'):
# Python2.4 can't handle zipfiles with > 64K files. Try using
# /usr/bin/unzip instead
self._legacy_file_list(force_refresh)
else:
raise
else:
try:
for member in archive.namelist():
if member not in self.excludes:
self._files_in_archive.append(to_native(member))
except:
archive.close()
raise UnarchiveError('Unable to list files in the archive')
archive.close()
return self._files_in_archive
def processArchives(filename, file_filter):
# Process zip file if required and return a list of files to process
files_to_process = []
if filename.endswith('.zip'):
try:
zip_archive_filename = filename
# Open the zip archive:
zip_archive = zipfile.ZipFile(zip_archive_filename, "r")
zipFileList = zip_archive.namelist()
zip_archive.close()
countTotalFiles = len(zipFileList)
logger.info("Total files in %s: %d" % (zip_archive_filename, countTotalFiles))
logger.info("Hold on while we check the zipped files...")
for zipped_filename in zipFileList:
if re.match(file_filter, zipped_filename):
files_to_process.append(os.path.join(zip_archive_filename, zipped_filename))
if len(files_to_process) == 0:
logger.error("No valid files found!")
except (IOError, zipfile.BadZipfile, struct.error), err:
logger.error("Error reading zip archive: %s" % zip_archive_filename)
exit(-1)
else:
files_to_process.append(filename)
return files_to_process
def archive_context(filename):
"""
Unzip filename to a temporary directory, set to the cwd.
The unzipped target is cleaned up after.
"""
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
try:
with ContextualZipFile(filename) as archive:
archive.extractall()
except zipfile.BadZipfile as err:
if not err.args:
err.args = ('', )
err.args = err.args + (
MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
)
raise
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def test_close_on_exception(self):
"""Check that the zipfile is closed if an exception is raised in the
'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
try:
with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
raise zipfile.BadZipfile()
except zipfile.BadZipfile:
self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
def test_close_erroneous_file(self):
# This test checks that the ZipFile constructor closes the file object
# it opens if there's an error in the file. If it doesn't, the
# traceback holds a reference to the ZipFile object and, indirectly,
# the file object.
# On Windows, this causes the os.unlink() call to fail because the
# underlying file is still open. This is SF bug #412214.
#
with open(TESTFN, "w") as fp:
fp.write("this is not a legal zip file\n")
try:
zf = zipfile.ZipFile(TESTFN)
except zipfile.BadZipfile:
pass
def test_damaged_zipfile(self):
"""Check that zipfiles with missing bytes at the end raise BadZipFile."""
# - Create a valid zip file
fp = io.BytesIO()
with zipfile.ZipFile(fp, mode="w") as zipf:
zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
zipfiledata = fp.getvalue()
# - Now create copies of it missing the last N bytes and make sure
# a BadZipFile exception is raised when we try to open it
for N in range(len(zipfiledata)):
fp = io.BytesIO(zipfiledata[:N])
self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, fp)
def test_empty_file_raises_BadZipFile(self):
with open(TESTFN, 'w') as f:
pass
self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
with open(TESTFN, 'w') as fp:
fp.write("short file")
self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
def get_function_root(self, name):
if not hasattr(self, 'functions_output'):
self.functions_output = TemporaryDirectory("puresec-serverless-functions-")
package_name = self._get_function_package_name(name)
function_root = os.path.join(self.functions_output.name, package_name)
if os.path.exists(function_root):
return function_root
try:
zipfile = ZipFile(os.path.join(self.serverless_package, "{}.zip".format(package_name)), 'r')
except FileNotFoundError:
eprint("error: serverless package did not create a function zip for '{}'", name)
raise SystemExit(2)
except BadZipFile:
eprint("error: serverless package did not create a valid function zip for '{}'", name)
raise SystemExit(2)
with zipfile:
zipfile.extractall(function_root)
return function_root
def clean_zip_file(self):
"""Open the zip file a first time, to check that it is a valid zip archive.
We'll open it again in a moment, so we have some duplication, but let's focus
on keeping the code easier to read!
"""
zip_file = self.cleaned_data['zip_file']
try:
zip = zipfile.ZipFile(zip_file)
except BadZipFile as e:
raise forms.ValidationError(str(e))
bad_file = zip.testzip()
if bad_file:
zip.close()
raise forms.ValidationError('"%s" in the .zip archive is corrupt.' % bad_file)
zip.close() # Close file in all cases.
return zip_file
def get_manifest(self, archive):
try:
with ZipFile(archive.temporary_file_path()) as plugin:
print(plugin.namelist())
prefix = self.get_prefix(plugin)
prefix = prefix + '/' if len(prefix) else ''
with plugin.open('{}manifest.json'.format(prefix)) as myfile:
manifest = json.loads(myfile.read())
validate_manifest(manifest)
return manifest
except BadZipFile:
raise ValidationError('Bad .zip format')
except FileNotFoundError:
raise ValidationError('Error with upload, please try again')
except KeyError:
raise ValidationError('No manifest.json found in archive')
except json.JSONDecodeError:
raise ValidationError('Error with manifest.json, bad Json Format')
except avasdk.exceptions.ValidationError as e:
raise ValidationError('Error in manifest.json ({})'.format(e))
def get_zip_filesize(filepath: str) -> int:
try:
my_zip = zipfile.ZipFile(filepath, 'r')
except zipfile.BadZipFile:
return -1
info_list = my_zip.infolist()
total_size = 0
for info in info_list:
if not info.filename.lower().endswith(
('.jpeg', '.jpg', '.png', '.gif')
):
continue
if '__macosx' in info.filename.lower():
continue
total_size += int(info.file_size)
my_zip.close()
return total_size
def get_zip_fileinfo(filepath: str) -> Tuple[int, int]:
try:
my_zip = zipfile.ZipFile(filepath, 'r')
except zipfile.BadZipFile:
return -1, -1
total_size = 0
total_count = 0
for info in my_zip.infolist():
if not info.filename.lower().endswith(
('.jpeg', '.jpg', '.png', '.gif')
):
continue
if '__macosx' in info.filename.lower():
continue
total_size += int(info.file_size)
total_count += 1
my_zip.close()
return total_size, total_count
def from_zip(cls, src='/tmp/app.zip', dest='/app'):
"""
Unzips a zipped app project file and instantiates it.
:param src: zipfile path
:param dest: destination folder to extract the zipfile content
Returns
A project instance.
"""
try:
zf = zipfile.ZipFile(src, 'r')
except FileNotFoundError:
raise errors.InvalidPathError(src)
except zipfile.BadZipFile:
raise errors.InvalidZipFileError(src)
[zf.extract(file, dest) for file in zf.namelist()]
zf.close()
return cls.from_path(dest)
def check_read_with_bad_crc(self, compression):
"""Tests that files with bad CRCs raise a BadZipFile exception when read."""
zipdata = self.zips_with_bad_crc[compression]
# Using ZipFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
# Using ZipExtFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
# Same with small reads (in order to exercise the buffering logic)
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
corrupt_file.MIN_READ_SIZE = 2
with self.assertRaises(zipfile.BadZipFile):
while corrupt_file.read(2):
pass
def test_empty_zipfile(self):
# Check that creating a file in 'w' or 'a' mode and closing without
# adding any files to the archives creates a valid empty ZIP file
zipf = zipfile.ZipFile(TESTFN, mode="w")
zipf.close()
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
except zipfile.BadZipFile:
self.fail("Unable to create empty ZIP file in 'w' mode")
zipf = zipfile.ZipFile(TESTFN, mode="a")
zipf.close()
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
except:
self.fail("Unable to create empty ZIP file in 'a' mode")
def test_empty_zipfile(self):
# Check that creating a file in 'w' or 'a' mode and closing without
# adding any files to the archives creates a valid empty ZIP file
zipf = zipfile.ZipFile(TESTFN, mode="w")
zipf.close()
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
except zipfile.BadZipFile:
self.fail("Unable to create empty ZIP file in 'w' mode")
zipf = zipfile.ZipFile(TESTFN, mode="a")
zipf.close()
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
except:
self.fail("Unable to create empty ZIP file in 'a' mode")
def test_read_with_bad_crc(self):
"""Tests that files with bad CRCs raise a BadZipFile exception when read."""
zipdata = self.zip_with_bad_crc
# Using ZipFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
# Using ZipExtFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
# Same with small reads (in order to exercise the buffering logic)
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
corrupt_file.MIN_READ_SIZE = 2
with self.assertRaises(zipfile.BadZipFile):
while corrupt_file.read(2):
pass