python类BadZipfile()的实例源码

forms.py 文件源码 项目:DCRM 作者: 82Flex 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def clean_zip_file(self):
        """Open the zip file a first time, to check that it is a valid zip archive.
        We'll open it again in a moment, so we have some duplication, but let's focus
        on keeping the code easier to read!
        """
        zip_file = self.cleaned_data['zip_file']
        try:
            zip = zipfile.ZipFile(zip_file)
        except BadZipFile as e:
            raise forms.ValidationError(str(e))
        bad_file = zip.testzip()
        if bad_file:
            zip.close()
            raise forms.ValidationError('"%s" in the .zip archive is corrupt.' % bad_file)
        zip.close()  # Close file in all cases.
        return zip_file
comicarchive.py 文件源码 项目:Comictagger 作者: dickloraine 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def readArchiveFile( self, archive_file ):
        data = ""
        zf = zipfile.ZipFile( self.path, 'r' )

        try:
            data = zf.read( archive_file )
        except zipfile.BadZipfile as e:
            print >> sys.stderr, u"bad zipfile [{0}]: {1} :: {2}".format(e, self.path, archive_file)
            zf.close()  
            raise IOError
        except Exception as e:
            zf.close()  
            print >> sys.stderr, u"bad zipfile [{0}]: {1} :: {2}".format(e, self.path, archive_file)
            raise IOError
        finally:
            zf.close()
        return data
views.py 文件源码 项目:fexum 作者: KDD-OpenSource 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def put(self, request):
        try:
            zip_file = request.FILES['file']
            archive = zipfile.ZipFile(zip_file)
        except (MultiValueDictKeyError, zipfile.BadZipfile):
            raise NotZIPFileError

        try:
            csv_name = [item for item in archive.namelist() if item.endswith('csv')][0]
        except IndexError:
            raise NoCSVInArchiveFoundError

        with archive.open(csv_name) as zip_csv_file:
            # Convert zipfile handle to Django file handle
            csv_file = File(zip_csv_file)
            dataset = Dataset.objects.create(
                name=zip_csv_file.name,
                content=csv_file,
                uploaded_by=request.user)

        # Start tasks for feature calculation
        initialize_from_dataset.delay(dataset_id=dataset.id)

        serializer = DatasetSerializer(instance=dataset)
        return Response(serializer.data)
mod.py 文件源码 项目:modman 作者: haihala 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, manager, name, version=None):
        self.manager = manager
        self.name = name
        self.pseudo = (name == "base")
        self.required_version = version # None means newest
        self._releases = None
        self._installed_version = None
        self._exists = None

        if not self.pseudo and self.any_version_installed:
            try:
                with zipfile.ZipFile(mod_folder.file_path(self.name)) as zf:
                    info_json_candidates = [n for n in zf.namelist() if n.rsplit("/", 1)[1] == "info.json"]
                    assert info_json_candidates, "Not a mod file"
                    with zf.open(info_json_candidates[0]) as f:
                        data = json.loads(f.read().decode())
            except zipfile.BadZipfile:
                raise CorruptedZipFile(mod_folder.file_path(self.name))

            self.title = data["title"]
            self._installed_version = data["version"]
            self._exists = True
unarchive.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _crc32(self, path):
        if self._infodict:
            return self._infodict[path]

        try:
            archive = ZipFile(self.src)
        except BadZipfile:
            e = get_exception()
            if e.args[0].lower().startswith('bad magic number'):
                # Python2.4 can't handle zipfiles with > 64K files.  Try using
                # /usr/bin/unzip instead
                self._legacy_file_list()
            else:
                raise
        else:
            try:
                for item in archive.infolist():
                    self._infodict[item.filename] = long(item.CRC)
            except:
                archive.close()
                raise UnarchiveError('Unable to list files in the archive')

        return self._infodict[path]
test_zipfile.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def check_read_with_bad_crc(self, compression):
        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
        zipdata = self.zips_with_bad_crc[compression]

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipfile):
                    while corrupt_file.read(2):
                        pass
test_zipfile.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_read_with_bad_crc(self, compression):
        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
        zipdata = self.zips_with_bad_crc[compression]

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipfile):
                    while corrupt_file.read(2):
                        pass
files.py 文件源码 项目:glazier 作者: google 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def Run(self):
    try:
      zip_file = self._args[0]
      out_path = self._args[1]
    except IndexError:
      raise ActionError('Unable to determine desired paths from %s.' %
                        str(self._args))

    try:
      file_util.CreateDirectories(out_path)
    except file_util.Error:
      raise ActionError('Unable to create output path %s.' % out_path)

    try:
      zf = zipfile.ZipFile(zip_file)
      zf.extractall(out_path)
    except (IOError, zipfile.BadZipfile) as e:
      raise ActionError('Bad zip file given as input.  %s' % e)
UnZip.py 文件源码 项目:download-manager 作者: thispc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def verify(self, password=None):
        try:
            with zipfile.ZipFile(self.filename, 'r') as z:
                z.setpassword(password)
                badfile = z.testzip()
                if badfile is not None:
                    raise CRCError(badfile)

        except (zipfile.BadZipfile, zipfile.LargeZipFile), e:
            raise ArchiveError(e)

        except RuntimeError, e:
            if "encrypted" in e.args[0] or "Bad password" in e.args[0]:
                raise PasswordError(e)
            else:
                raise CRCError(e)
zip.py 文件源码 项目:cuckoodroid-2.0 作者: idanr1986 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _extract(self, filename, password):
        archive_path = _prepare_archive_at_path(filename)
        if not archive_path:
            return None
        # Extraction.
        extract_path = environ.get("TEMP", "/tmp")
        with ZipFile(archive_path, "r") as archive:
            try:
                archive.extractall(path=extract_path, pwd=password)
            except BadZipfile:
                raise Exception("Invalid Zip file")
            # Try to extract it again, but with a default password
            except RuntimeError:
                try:
                    archive.extractall(path=extract_path, pwd="infected")
                except RuntimeError as err:
                    raise Exception("Unable to extract Zip file: %s" % err)
            finally:
                self._extract_nested_archives(archive, extract_path, password)
        return archive.namelist()
zip.py 文件源码 项目:cuckoodroid-2.0 作者: idanr1986 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _prepare_archive_at_path(filename):
    """ Verifies that there's a readable zip archive at the given path.

    This function returns a new name for the archive (for most cases it's
    the same as the original one; but if an archive named "foo.zip" contains
    a file named "foo" this archive will be renamed to avoid being overwrite.
    """
    # Verify that the archive is actually readable
    try:
        with ZipFile(filename, "r") as archive:
            archive.close()
    except BadZipfile:
        return None
    # Test if zip file contains a file named as itself
    if _is_overwritten(filename):
        log.debug("ZIP file contains a file with the same name, original is \
        going to be overwrite")
        # In this case we just change the file name
        new_zip_path = filename + _random_extension()
        move(filename, new_zip_path)
        filename = new_zip_path
    return filename
command_line.py 文件源码 项目:Electrify 作者: jyapayne 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def extract_files(self):
        self.extract_error = None
        location = self.get_setting('download_dir').value
        version = self.selected_version()
        for setting_name, setting in self.settings['export_settings'].items():
            save_file_path = setting.save_file_path(version,
                                                    location)
            try:
                if setting.value:
                    extract_path = get_data_path('files/'+setting.name)
                    setting.extract(extract_path, version)

                    self.progress_text += '.'

            except (tarfile.ReadError, zipfile.BadZipfile) as e:
                if os.path.exists(save_file_path):
                    os.remove(save_file_path)
                self.extract_error = e
                self.logger.error(self.extract_error)
                # cannot use GUI in thread to notify user. Save it for later
        self.progress_text = '\nDone.\n'
        return True
___ankihub.py 文件源码 项目:AnkiHub 作者: dayjaby 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def installZipFile(data, fname):
    base = os.path.join(defaultBase(),'addons')
    if fname.endswith(".py"):
        path = os.path.join(base, fname)
        open(path, "wb").write(data)
        return True
    # .zip file
    try:
        z = zipfile.ZipFile(io.BytesIO(data))
    except zipfile.BadZipfile:
        return False
    for n in z.namelist():
        if n.endswith("/"):
            # folder; ignore
            continue
        # write
        z.extract(n, base)
    return True
manifest.py 文件源码 项目:javapackages 作者: fedora-java 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _read_manifest(self):
        mf = None
        if self._path.endswith("/META-INF/MANIFEST.MF"):
            mf = open(self._path, "rb")
        if zipfile.is_zipfile(self._path):
            # looks like "zipfile.is_zipfile()" is not reliable
            # see rhbz#889131 for more details
            try:
                jarfile = ZipFile(self._path)
                if "META-INF/MANIFEST.MF" in jarfile.namelist():
                    mf = jarfile.open("META-INF/MANIFEST.MF", "r")
            except (IOError, BadZipfile):
                pass
        if mf is None:
            return None
        content = mf.read()
        mf.close()
        return content.decode("utf-8")
test_zipfile.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def check_read_with_bad_crc(self, compression):
        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
        zipdata = self.zips_with_bad_crc[compression]

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipfile):
                    while corrupt_file.read(2):
                        pass
test_zipfile.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def check_read_with_bad_crc(self, compression):
        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
        zipdata = self.zips_with_bad_crc[compression]

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipfile):
                    while corrupt_file.read(2):
                        pass
package.py 文件源码 项目:algochecker-engine 作者: algochecker 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def download_package_from_url(url, dest):
    logging.info('Attempting to download missing package from: ' + url)
    tmp_path = internal_path("work/dl_package.zip")

    try:
        req = requests.get(url, stream=True, timeout=10)

        if req.status_code == 200:
            with open(tmp_path, 'wb') as tmp_file:
                req.raw.decode_content = True
                shutil.copyfileobj(req.raw, tmp_file)
        else:
            raise PackageLoadError('Package download failed, server said: {} {}'.format(req.status_code, req.reason))
    except (RuntimeError, IOError) as e:
        raise PackageLoadError('Package download failed due to an error') from e

    logging.info('Extracting package...')
    try:
        with ZipFile(tmp_path, "r") as z:
            z.extractall(dest)
    except BadZipfile as e:
        raise PackageLoadError('Malformed package zip file') from e

    os.remove(tmp_path)
executor.py 文件源码 项目:grade-oven 作者: mikelmcdaniel 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def from_zip(cls, file_obj, stages_name, stages_root):
    "Unpack zip from file_obj into os.path.join(stages_root, stages_name)."
    try:
      assignment_root = os.path.join(stages_root, stages_name)
      os.mkdir(assignment_root)
      with zipfile.ZipFile(file_obj, 'r') as zf:
        bad_filename = zf.testzip()
        if bad_filename is not None:
          raise Error('Corrupt file in zip: ' + bad_filename)
        # TODO: Handle case where zf.namelist() uses a lot of memory
        archived_files = zf.namelist()
        for af in archived_files:
          zf.extract(af, assignment_root)
        # TODO: The stage.save_main_script() code below is used as a workaround
        # to ensure that the main script is executable. Ideally, file
        # permissions would be preserved.
        stages = cls(assignment_root)
        for stage in stages.stages.itervalues():
          stage.save_main_script()
        return stages
    except (zipfile.BadZipfile, zipfile.LargeZipFile) as e:
      raise Error(e)
test_zipstream.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_invalidHeader(self):
        """
        A zipfile entry with the wrong magic number should raise BadZipfile for
        readfile(), but that should not affect other files in the archive.
        """
        fn = self.makeZipFile(["test contents",
                               "more contents"])
        with zipfile.ZipFile(fn, "r") as zf:
            zeroOffset = zf.getinfo("0").header_offset
        # Zero out just the one header.
        with open(fn, "r+b") as scribble:
            scribble.seek(zeroOffset, 0)
            scribble.write(b'0' * 4)
        with zipstream.ChunkingZipFile(fn) as czf:
            self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
            with czf.readfile("1") as zfe:
                self.assertEqual(zfe.read(), b"more contents")
test_zipstream.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_filenameMismatch(self):
        """
        A zipfile entry with a different filename than is found in the central
        directory should raise BadZipfile.
        """
        fn = self.makeZipFile([b"test contents",
                               b"more contents"])
        with zipfile.ZipFile(fn, "r") as zf:
            info = zf.getinfo("0")
            info.filename = "not zero"
        with open(fn, "r+b") as scribble:
            scribble.seek(info.header_offset, 0)
            scribble.write(info.FileHeader())

        with zipstream.ChunkingZipFile(fn) as czf:
            self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
            with czf.readfile("1") as zfe:
                self.assertEqual(zfe.read(), b"more contents")
test_zipstream.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_unsupportedCompression(self):
        """
        A zipfile which describes an unsupported compression mechanism should
        raise BadZipfile.
        """
        fn = self.mktemp()
        with zipfile.ZipFile(fn, "w") as zf:
            zi = zipfile.ZipInfo("0")
            zf.writestr(zi, "some data")
            # Mangle its compression type in the central directory; can't do
            # this before the writestr call or zipfile will (correctly) tell us
            # not to pass bad compression types :)
            zi.compress_type = 1234

        with zipstream.ChunkingZipFile(fn) as czf:
            self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
UnZip.py 文件源码 项目:pyload-plugins 作者: pyload 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def verify(self, password=None):
        try:
            with zipfile.ZipFile(self.filename, 'r') as z:
                z.setpassword(password)
                badfile = z.testzip()
                if badfile is not None:
                    raise CRCError(badfile)

        except (zipfile.BadZipfile, zipfile.LargeZipFile), e:
            raise ArchiveError(e)

        except RuntimeError, e:
            if "encrypted" in e.args[0] or "Bad password" in e.args[0]:
                raise PasswordError(e)
            else:
                raise CRCError(e)
mime.py 文件源码 项目:pymilter 作者: sdgathman 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def check_name(msg,savname=None,ckname=check_ext,scan_zip=False):
  "Replace attachment with a warning if its name is suspicious."
  try:
    for key,name in msg.getnames(scan_zip):
      badname = ckname(name)
      if badname:
        if key == 'zipname':
          badname = msg.get_filename()
        break
    else:
      return Milter.CONTINUE
  except zipfile.BadZipfile:
    # a ZIP that is not a zip is very suspicious
    badname = msg.get_filename()
  hostname = socket.gethostname()
  msg.set_payload(virus_msg % (badname,hostname,savname))
  del msg["content-type"]
  del msg["content-disposition"]
  del msg["content-transfer-encoding"]
  name = "WARNING.TXT"
  msg["Content-Type"] = "text/plain; name="+name
  return Milter.CONTINUE
cflareupdate.py 文件源码 项目:Security-Research 作者: RhinoSecurityLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def parsensout(cfdbpath):
    """Parse CrimeFlare's nsout.zip archive into a dictionary."""
    # Open nsout archive, parse out required data, store into a list
    try:
        znsout = zipfile.ZipFile('{}/nsout.zip'.format(cfdbpath))
        for finfo in znsout.infolist():
            ifile = znsout.open(finfo)
            for record in ifile.readlines():
                try:
                    NS1, NS2, DOMAIN = record.decode('utf-8').split()
                except Exception:
                    NSPLIT = record.decode('utf-8').split()
                    if len(NSPLIT) > 3:
                        DOMAIN = NSPLIT[-1]
                        NSPLIT.remove(DOMAIN)
                        NS1 = ' '.join(NSPLIT[:-2])
                        NS2 = ' '.join(NSPLIT[-2:])
                    pass
                _nsdict[DOMAIN] = '{} {}'.format(NS1, NS2)
    except(zipfile.BadZipfile):
        print("[-] Bad checksum on downloaded archive. Try to update again.")
        raise SystemExit
cflareupdate.py 文件源码 项目:Security-Research 作者: RhinoSecurityLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parsecountry(cfdbpath):
    """Parse CrimeFlare's country.zip archive into a dictionary."""
    # Open country archive, parse out required data, store into a list
    try:
        zcountry = zipfile.ZipFile('{}/country.zip'.format(cfdbpath))
        for finfo in zcountry.infolist():
            ifile = zcountry.open(finfo)
            for record in ifile.readlines():
                try:
                    DOMAIN, IP, COUNTRY = record.decode('utf-8').split()
                except(ValueError):
                    COUNTRY = ' '.join(map(str, record.decode('utf-8').split()[2:]))
                _countrydict[DOMAIN] = '{}'.format(COUNTRY)
    except(zipfile.BadZipfile):
        print("[-] Bad checksum on downloaded archive. Try to update again.")
        raise SystemExit
analyser.py 文件源码 项目:wp-file-analyser 作者: racwn 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def unzip(zippedFile, outPath):
    """Extract all files from a zip archive to a destination directory."""
    newDir = False  # the toplevel directory name in the zipfile
    fh = open_file(zippedFile, 'rb')
    with fh:
        try:
            z = zipfile.ZipFile(fh)
            namelist = z.namelist()
            newDir = namelist[0]
            for name in namelist:
                z.extract(name, outPath)
        except RuntimeError as re:
            msg("Error processing zip (RuntimeError): %s" % (re), True)
            return False
        except IOError as ioe:
            msg("Error opening [%s]: %s" % (zippedFile, ioe.strerror), True)
            return False
        except zipfile.BadZipfile as bzf:
            msg("Bad zip file: %s" % zippedFile, True)
            return False
    return newDir
compression.py 文件源码 项目:pi_romulus 作者: ArthurMoore85 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def unzip(self, file_obj):
        """
        Unzips a ZIP archive
        :param file_obj: file to be unzipped
        """
        try:
            zip_ref = zipfile.ZipFile(file_obj, 'r')
            zip_ref.extractall(self.target_dir)
            zip_ref.close()
        except zipfile.BadZipfile:
            self.clean_up(os.path.join(self.target_dir, file_obj))
shadowservermail.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def handle_application_zip(self, headers, fileobj):
        self.log.info("Opening a ZIP attachment")
        fileobj = self._decode(headers, fileobj)
        try:
            zip = zipfile.ZipFile(fileobj)
        except zipfile.BadZipfile as error:
            self.log.error("ZIP handling failed ({0})".format(error))
            idiokit.stop(False)

        for filename in zip.namelist():
            csv_data = StringIO(zip.read(filename))

            self.log.info("Parsing CSV data from the ZIP attachment")
            result = yield self.parse_csv(headers, filename, csv_data)
            idiokit.stop(result)
mail.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def handle_application_zip(self, msg):
        self.log.info("Opening a ZIP attachment")
        data = yield msg.get_payload(decode=True)
        try:
            zip = zipfile.ZipFile(StringIO(data))
        except zipfile.BadZipfile as error:
            self.log.error("ZIP handling failed ({0})".format(error))
            idiokit.stop(False)

        for filename in zip.namelist():
            csv_data = zip.open(filename)

            self.log.info("Parsing CSV data from the ZIP attachment")
            result = yield self.parse_csv(filename, csv_data)
            idiokit.stop(result)
zipstream.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def readfile(self, name):
        """Return file-like object for name."""
        if self.mode not in ("r", "a"):
            raise RuntimeError, 'read() requires mode "r" or "a"'
        if not self.fp:
            raise RuntimeError, \
                  "Attempt to read ZIP archive that was already closed"
        zinfo = self.getinfo(name)

        self.fp.seek(zinfo.header_offset, 0)

        # Skip the file header:
        fheader = self.fp.read(30)
        if fheader[0:4] != zipfile.stringFileHeader:
            raise zipfile.BadZipfile, "Bad magic number for file header"

        fheader = struct.unpack(zipfile.structFileHeader, fheader)
        fname = self.fp.read(fheader[zipfile._FH_FILENAME_LENGTH])
        if fheader[zipfile._FH_EXTRA_FIELD_LENGTH]:
            self.fp.read(fheader[zipfile._FH_EXTRA_FIELD_LENGTH])

        if fname != zinfo.orig_filename:
            raise zipfile.BadZipfile, \
                      'File name in directory "%s" and header "%s" differ.' % (
                          zinfo.orig_filename, fname)

        if zinfo.compress_type == zipfile.ZIP_STORED:
            return ZipFileEntry(self.fp, zinfo.compress_size)
        elif zinfo.compress_type == zipfile.ZIP_DEFLATED:
            if not zlib:
                raise RuntimeError, \
                      "De-compression requires the (missing) zlib module"
            return DeflatedZipFileEntry(self.fp, zinfo.compress_size)
        else:
            raise zipfile.BadZipfile, \
                  "Unsupported compression method %d for file %s" % \
            (zinfo.compress_type, name)


问题


面经


文章

微信
公众号

扫码关注公众号