def clean_zip_file(self):
"""Open the zip file a first time, to check that it is a valid zip archive.
We'll open it again in a moment, so we have some duplication, but let's focus
on keeping the code easier to read!
"""
zip_file = self.cleaned_data['zip_file']
try:
zip = zipfile.ZipFile(zip_file)
except BadZipFile as e:
raise forms.ValidationError(str(e))
bad_file = zip.testzip()
if bad_file:
zip.close()
raise forms.ValidationError('"%s" in the .zip archive is corrupt.' % bad_file)
zip.close() # Close file in all cases.
return zip_file
python类BadZipfile()的实例源码
def readArchiveFile( self, archive_file ):
data = ""
zf = zipfile.ZipFile( self.path, 'r' )
try:
data = zf.read( archive_file )
except zipfile.BadZipfile as e:
print >> sys.stderr, u"bad zipfile [{0}]: {1} :: {2}".format(e, self.path, archive_file)
zf.close()
raise IOError
except Exception as e:
zf.close()
print >> sys.stderr, u"bad zipfile [{0}]: {1} :: {2}".format(e, self.path, archive_file)
raise IOError
finally:
zf.close()
return data
def put(self, request):
try:
zip_file = request.FILES['file']
archive = zipfile.ZipFile(zip_file)
except (MultiValueDictKeyError, zipfile.BadZipfile):
raise NotZIPFileError
try:
csv_name = [item for item in archive.namelist() if item.endswith('csv')][0]
except IndexError:
raise NoCSVInArchiveFoundError
with archive.open(csv_name) as zip_csv_file:
# Convert zipfile handle to Django file handle
csv_file = File(zip_csv_file)
dataset = Dataset.objects.create(
name=zip_csv_file.name,
content=csv_file,
uploaded_by=request.user)
# Start tasks for feature calculation
initialize_from_dataset.delay(dataset_id=dataset.id)
serializer = DatasetSerializer(instance=dataset)
return Response(serializer.data)
def __init__(self, manager, name, version=None):
self.manager = manager
self.name = name
self.pseudo = (name == "base")
self.required_version = version # None means newest
self._releases = None
self._installed_version = None
self._exists = None
if not self.pseudo and self.any_version_installed:
try:
with zipfile.ZipFile(mod_folder.file_path(self.name)) as zf:
info_json_candidates = [n for n in zf.namelist() if n.rsplit("/", 1)[1] == "info.json"]
assert info_json_candidates, "Not a mod file"
with zf.open(info_json_candidates[0]) as f:
data = json.loads(f.read().decode())
except zipfile.BadZipfile:
raise CorruptedZipFile(mod_folder.file_path(self.name))
self.title = data["title"]
self._installed_version = data["version"]
self._exists = True
def _crc32(self, path):
if self._infodict:
return self._infodict[path]
try:
archive = ZipFile(self.src)
except BadZipfile:
e = get_exception()
if e.args[0].lower().startswith('bad magic number'):
# Python2.4 can't handle zipfiles with > 64K files. Try using
# /usr/bin/unzip instead
self._legacy_file_list()
else:
raise
else:
try:
for item in archive.infolist():
self._infodict[item.filename] = long(item.CRC)
except:
archive.close()
raise UnarchiveError('Unable to list files in the archive')
return self._infodict[path]
def check_read_with_bad_crc(self, compression):
"""Tests that files with bad CRCs raise a BadZipfile exception when read."""
zipdata = self.zips_with_bad_crc[compression]
# Using ZipFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')
# Using ZipExtFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
self.assertRaises(zipfile.BadZipfile, corrupt_file.read)
# Same with small reads (in order to exercise the buffering logic)
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
corrupt_file.MIN_READ_SIZE = 2
with self.assertRaises(zipfile.BadZipfile):
while corrupt_file.read(2):
pass
def check_read_with_bad_crc(self, compression):
"""Tests that files with bad CRCs raise a BadZipfile exception when read."""
zipdata = self.zips_with_bad_crc[compression]
# Using ZipFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')
# Using ZipExtFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
self.assertRaises(zipfile.BadZipfile, corrupt_file.read)
# Same with small reads (in order to exercise the buffering logic)
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
corrupt_file.MIN_READ_SIZE = 2
with self.assertRaises(zipfile.BadZipfile):
while corrupt_file.read(2):
pass
def Run(self):
try:
zip_file = self._args[0]
out_path = self._args[1]
except IndexError:
raise ActionError('Unable to determine desired paths from %s.' %
str(self._args))
try:
file_util.CreateDirectories(out_path)
except file_util.Error:
raise ActionError('Unable to create output path %s.' % out_path)
try:
zf = zipfile.ZipFile(zip_file)
zf.extractall(out_path)
except (IOError, zipfile.BadZipfile) as e:
raise ActionError('Bad zip file given as input. %s' % e)
def verify(self, password=None):
try:
with zipfile.ZipFile(self.filename, 'r') as z:
z.setpassword(password)
badfile = z.testzip()
if badfile is not None:
raise CRCError(badfile)
except (zipfile.BadZipfile, zipfile.LargeZipFile), e:
raise ArchiveError(e)
except RuntimeError, e:
if "encrypted" in e.args[0] or "Bad password" in e.args[0]:
raise PasswordError(e)
else:
raise CRCError(e)
def _extract(self, filename, password):
archive_path = _prepare_archive_at_path(filename)
if not archive_path:
return None
# Extraction.
extract_path = environ.get("TEMP", "/tmp")
with ZipFile(archive_path, "r") as archive:
try:
archive.extractall(path=extract_path, pwd=password)
except BadZipfile:
raise Exception("Invalid Zip file")
# Try to extract it again, but with a default password
except RuntimeError:
try:
archive.extractall(path=extract_path, pwd="infected")
except RuntimeError as err:
raise Exception("Unable to extract Zip file: %s" % err)
finally:
self._extract_nested_archives(archive, extract_path, password)
return archive.namelist()
def _prepare_archive_at_path(filename):
""" Verifies that there's a readable zip archive at the given path.
This function returns a new name for the archive (for most cases it's
the same as the original one; but if an archive named "foo.zip" contains
a file named "foo" this archive will be renamed to avoid being overwrite.
"""
# Verify that the archive is actually readable
try:
with ZipFile(filename, "r") as archive:
archive.close()
except BadZipfile:
return None
# Test if zip file contains a file named as itself
if _is_overwritten(filename):
log.debug("ZIP file contains a file with the same name, original is \
going to be overwrite")
# In this case we just change the file name
new_zip_path = filename + _random_extension()
move(filename, new_zip_path)
filename = new_zip_path
return filename
def extract_files(self):
self.extract_error = None
location = self.get_setting('download_dir').value
version = self.selected_version()
for setting_name, setting in self.settings['export_settings'].items():
save_file_path = setting.save_file_path(version,
location)
try:
if setting.value:
extract_path = get_data_path('files/'+setting.name)
setting.extract(extract_path, version)
self.progress_text += '.'
except (tarfile.ReadError, zipfile.BadZipfile) as e:
if os.path.exists(save_file_path):
os.remove(save_file_path)
self.extract_error = e
self.logger.error(self.extract_error)
# cannot use GUI in thread to notify user. Save it for later
self.progress_text = '\nDone.\n'
return True
def installZipFile(data, fname):
base = os.path.join(defaultBase(),'addons')
if fname.endswith(".py"):
path = os.path.join(base, fname)
open(path, "wb").write(data)
return True
# .zip file
try:
z = zipfile.ZipFile(io.BytesIO(data))
except zipfile.BadZipfile:
return False
for n in z.namelist():
if n.endswith("/"):
# folder; ignore
continue
# write
z.extract(n, base)
return True
def _read_manifest(self):
mf = None
if self._path.endswith("/META-INF/MANIFEST.MF"):
mf = open(self._path, "rb")
if zipfile.is_zipfile(self._path):
# looks like "zipfile.is_zipfile()" is not reliable
# see rhbz#889131 for more details
try:
jarfile = ZipFile(self._path)
if "META-INF/MANIFEST.MF" in jarfile.namelist():
mf = jarfile.open("META-INF/MANIFEST.MF", "r")
except (IOError, BadZipfile):
pass
if mf is None:
return None
content = mf.read()
mf.close()
return content.decode("utf-8")
def check_read_with_bad_crc(self, compression):
"""Tests that files with bad CRCs raise a BadZipfile exception when read."""
zipdata = self.zips_with_bad_crc[compression]
# Using ZipFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')
# Using ZipExtFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
self.assertRaises(zipfile.BadZipfile, corrupt_file.read)
# Same with small reads (in order to exercise the buffering logic)
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
corrupt_file.MIN_READ_SIZE = 2
with self.assertRaises(zipfile.BadZipfile):
while corrupt_file.read(2):
pass
def check_read_with_bad_crc(self, compression):
"""Tests that files with bad CRCs raise a BadZipfile exception when read."""
zipdata = self.zips_with_bad_crc[compression]
# Using ZipFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')
# Using ZipExtFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
self.assertRaises(zipfile.BadZipfile, corrupt_file.read)
# Same with small reads (in order to exercise the buffering logic)
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
corrupt_file.MIN_READ_SIZE = 2
with self.assertRaises(zipfile.BadZipfile):
while corrupt_file.read(2):
pass
def download_package_from_url(url, dest):
logging.info('Attempting to download missing package from: ' + url)
tmp_path = internal_path("work/dl_package.zip")
try:
req = requests.get(url, stream=True, timeout=10)
if req.status_code == 200:
with open(tmp_path, 'wb') as tmp_file:
req.raw.decode_content = True
shutil.copyfileobj(req.raw, tmp_file)
else:
raise PackageLoadError('Package download failed, server said: {} {}'.format(req.status_code, req.reason))
except (RuntimeError, IOError) as e:
raise PackageLoadError('Package download failed due to an error') from e
logging.info('Extracting package...')
try:
with ZipFile(tmp_path, "r") as z:
z.extractall(dest)
except BadZipfile as e:
raise PackageLoadError('Malformed package zip file') from e
os.remove(tmp_path)
def from_zip(cls, file_obj, stages_name, stages_root):
"Unpack zip from file_obj into os.path.join(stages_root, stages_name)."
try:
assignment_root = os.path.join(stages_root, stages_name)
os.mkdir(assignment_root)
with zipfile.ZipFile(file_obj, 'r') as zf:
bad_filename = zf.testzip()
if bad_filename is not None:
raise Error('Corrupt file in zip: ' + bad_filename)
# TODO: Handle case where zf.namelist() uses a lot of memory
archived_files = zf.namelist()
for af in archived_files:
zf.extract(af, assignment_root)
# TODO: The stage.save_main_script() code below is used as a workaround
# to ensure that the main script is executable. Ideally, file
# permissions would be preserved.
stages = cls(assignment_root)
for stage in stages.stages.itervalues():
stage.save_main_script()
return stages
except (zipfile.BadZipfile, zipfile.LargeZipFile) as e:
raise Error(e)
def test_invalidHeader(self):
"""
A zipfile entry with the wrong magic number should raise BadZipfile for
readfile(), but that should not affect other files in the archive.
"""
fn = self.makeZipFile(["test contents",
"more contents"])
with zipfile.ZipFile(fn, "r") as zf:
zeroOffset = zf.getinfo("0").header_offset
# Zero out just the one header.
with open(fn, "r+b") as scribble:
scribble.seek(zeroOffset, 0)
scribble.write(b'0' * 4)
with zipstream.ChunkingZipFile(fn) as czf:
self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
with czf.readfile("1") as zfe:
self.assertEqual(zfe.read(), b"more contents")
def test_filenameMismatch(self):
"""
A zipfile entry with a different filename than is found in the central
directory should raise BadZipfile.
"""
fn = self.makeZipFile([b"test contents",
b"more contents"])
with zipfile.ZipFile(fn, "r") as zf:
info = zf.getinfo("0")
info.filename = "not zero"
with open(fn, "r+b") as scribble:
scribble.seek(info.header_offset, 0)
scribble.write(info.FileHeader())
with zipstream.ChunkingZipFile(fn) as czf:
self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
with czf.readfile("1") as zfe:
self.assertEqual(zfe.read(), b"more contents")
def test_unsupportedCompression(self):
"""
A zipfile which describes an unsupported compression mechanism should
raise BadZipfile.
"""
fn = self.mktemp()
with zipfile.ZipFile(fn, "w") as zf:
zi = zipfile.ZipInfo("0")
zf.writestr(zi, "some data")
# Mangle its compression type in the central directory; can't do
# this before the writestr call or zipfile will (correctly) tell us
# not to pass bad compression types :)
zi.compress_type = 1234
with zipstream.ChunkingZipFile(fn) as czf:
self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
def verify(self, password=None):
try:
with zipfile.ZipFile(self.filename, 'r') as z:
z.setpassword(password)
badfile = z.testzip()
if badfile is not None:
raise CRCError(badfile)
except (zipfile.BadZipfile, zipfile.LargeZipFile), e:
raise ArchiveError(e)
except RuntimeError, e:
if "encrypted" in e.args[0] or "Bad password" in e.args[0]:
raise PasswordError(e)
else:
raise CRCError(e)
def check_name(msg,savname=None,ckname=check_ext,scan_zip=False):
"Replace attachment with a warning if its name is suspicious."
try:
for key,name in msg.getnames(scan_zip):
badname = ckname(name)
if badname:
if key == 'zipname':
badname = msg.get_filename()
break
else:
return Milter.CONTINUE
except zipfile.BadZipfile:
# a ZIP that is not a zip is very suspicious
badname = msg.get_filename()
hostname = socket.gethostname()
msg.set_payload(virus_msg % (badname,hostname,savname))
del msg["content-type"]
del msg["content-disposition"]
del msg["content-transfer-encoding"]
name = "WARNING.TXT"
msg["Content-Type"] = "text/plain; name="+name
return Milter.CONTINUE
def parsensout(cfdbpath):
"""Parse CrimeFlare's nsout.zip archive into a dictionary."""
# Open nsout archive, parse out required data, store into a list
try:
znsout = zipfile.ZipFile('{}/nsout.zip'.format(cfdbpath))
for finfo in znsout.infolist():
ifile = znsout.open(finfo)
for record in ifile.readlines():
try:
NS1, NS2, DOMAIN = record.decode('utf-8').split()
except Exception:
NSPLIT = record.decode('utf-8').split()
if len(NSPLIT) > 3:
DOMAIN = NSPLIT[-1]
NSPLIT.remove(DOMAIN)
NS1 = ' '.join(NSPLIT[:-2])
NS2 = ' '.join(NSPLIT[-2:])
pass
_nsdict[DOMAIN] = '{} {}'.format(NS1, NS2)
except(zipfile.BadZipfile):
print("[-] Bad checksum on downloaded archive. Try to update again.")
raise SystemExit
def parsecountry(cfdbpath):
"""Parse CrimeFlare's country.zip archive into a dictionary."""
# Open country archive, parse out required data, store into a list
try:
zcountry = zipfile.ZipFile('{}/country.zip'.format(cfdbpath))
for finfo in zcountry.infolist():
ifile = zcountry.open(finfo)
for record in ifile.readlines():
try:
DOMAIN, IP, COUNTRY = record.decode('utf-8').split()
except(ValueError):
COUNTRY = ' '.join(map(str, record.decode('utf-8').split()[2:]))
_countrydict[DOMAIN] = '{}'.format(COUNTRY)
except(zipfile.BadZipfile):
print("[-] Bad checksum on downloaded archive. Try to update again.")
raise SystemExit
def unzip(zippedFile, outPath):
"""Extract all files from a zip archive to a destination directory."""
newDir = False # the toplevel directory name in the zipfile
fh = open_file(zippedFile, 'rb')
with fh:
try:
z = zipfile.ZipFile(fh)
namelist = z.namelist()
newDir = namelist[0]
for name in namelist:
z.extract(name, outPath)
except RuntimeError as re:
msg("Error processing zip (RuntimeError): %s" % (re), True)
return False
except IOError as ioe:
msg("Error opening [%s]: %s" % (zippedFile, ioe.strerror), True)
return False
except zipfile.BadZipfile as bzf:
msg("Bad zip file: %s" % zippedFile, True)
return False
return newDir
def unzip(self, file_obj):
"""
Unzips a ZIP archive
:param file_obj: file to be unzipped
"""
try:
zip_ref = zipfile.ZipFile(file_obj, 'r')
zip_ref.extractall(self.target_dir)
zip_ref.close()
except zipfile.BadZipfile:
self.clean_up(os.path.join(self.target_dir, file_obj))
def handle_application_zip(self, headers, fileobj):
self.log.info("Opening a ZIP attachment")
fileobj = self._decode(headers, fileobj)
try:
zip = zipfile.ZipFile(fileobj)
except zipfile.BadZipfile as error:
self.log.error("ZIP handling failed ({0})".format(error))
idiokit.stop(False)
for filename in zip.namelist():
csv_data = StringIO(zip.read(filename))
self.log.info("Parsing CSV data from the ZIP attachment")
result = yield self.parse_csv(headers, filename, csv_data)
idiokit.stop(result)
def handle_application_zip(self, msg):
self.log.info("Opening a ZIP attachment")
data = yield msg.get_payload(decode=True)
try:
zip = zipfile.ZipFile(StringIO(data))
except zipfile.BadZipfile as error:
self.log.error("ZIP handling failed ({0})".format(error))
idiokit.stop(False)
for filename in zip.namelist():
csv_data = zip.open(filename)
self.log.info("Parsing CSV data from the ZIP attachment")
result = yield self.parse_csv(filename, csv_data)
idiokit.stop(result)
def readfile(self, name):
"""Return file-like object for name."""
if self.mode not in ("r", "a"):
raise RuntimeError, 'read() requires mode "r" or "a"'
if not self.fp:
raise RuntimeError, \
"Attempt to read ZIP archive that was already closed"
zinfo = self.getinfo(name)
self.fp.seek(zinfo.header_offset, 0)
# Skip the file header:
fheader = self.fp.read(30)
if fheader[0:4] != zipfile.stringFileHeader:
raise zipfile.BadZipfile, "Bad magic number for file header"
fheader = struct.unpack(zipfile.structFileHeader, fheader)
fname = self.fp.read(fheader[zipfile._FH_FILENAME_LENGTH])
if fheader[zipfile._FH_EXTRA_FIELD_LENGTH]:
self.fp.read(fheader[zipfile._FH_EXTRA_FIELD_LENGTH])
if fname != zinfo.orig_filename:
raise zipfile.BadZipfile, \
'File name in directory "%s" and header "%s" differ.' % (
zinfo.orig_filename, fname)
if zinfo.compress_type == zipfile.ZIP_STORED:
return ZipFileEntry(self.fp, zinfo.compress_size)
elif zinfo.compress_type == zipfile.ZIP_DEFLATED:
if not zlib:
raise RuntimeError, \
"De-compression requires the (missing) zlib module"
return DeflatedZipFileEntry(self.fp, zinfo.compress_size)
else:
raise zipfile.BadZipfile, \
"Unsupported compression method %d for file %s" % \
(zinfo.compress_type, name)