python类BadZipfile()的实例源码

zipstream.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def read(self, name):
        """Return file bytes (as a string) for name."""
        f = self.readfile(name)
        zinfo = self.getinfo(name)
        bytes = f.read()
        crc = binascii.crc32(bytes)
        if crc != zinfo.CRC:
            raise zipfile.BadZipfile, "Bad CRC-32 for file %s" % name
        return bytes
toolkit.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def unzip(f):
    if os.path.isfile(f):
        try:
            with zipfile.ZipFile(f) as zf:
                zf.extractall('.')
                return 'File {} extracted.'.format(f)
        except zipfile.BadZipfile:
            return 'Error: Failed to unzip file.'
    else:
        return 'Error: File not found.'
unzip.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def unzip(f):
    if os.path.isfile(f):
        try:
            with zipfile.ZipFile(f) as zf:
                zf.extractall('.')
                return '[*] File {} extracted.'.format(f)
        except zipfile.BadZipfile:
            return '[!] Error: Failed to unzip file.'
    else:
        return '[-] Error: File not found.'
toolkit.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def unzip(f):
    if os.path.isfile(f):
        try:
            with zipfile.ZipFile(f) as zf:
                zf.extractall('.')
                return 'File {} extracted.'.format(f)
        except zipfile.BadZipfile:
            return 'Error: Failed to unzip file.'
    else:
        return 'Error: File not found.'
ez_setup.py 文件源码 项目:jcchess 作者: johncheetham 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        try:
            with ContextualZipFile(filename) as archive:
                archive.extractall()
        except zipfile.BadZipfile as err:
            if not err.args:
                err.args = ('', )
            err.args = err.args + (
                MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
            )
            raise

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
ez_setup.py 文件源码 项目:whoseline 作者: bmorris3 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        try:
            with ContextualZipFile(filename) as archive:
                archive.extractall()
        except zipfile.BadZipfile as err:
            if not err.args:
                err.args = ('', )
            err.args = err.args + (
                MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
            )
            raise

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
ez_setup.py 文件源码 项目:dust_extinction 作者: karllark 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        try:
            with ContextualZipFile(filename) as archive:
                archive.extractall()
        except zipfile.BadZipfile as err:
            if not err.args:
                err.args = ('', )
            err.args = err.args + (
                MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
            )
            raise

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
ez_setup.py 文件源码 项目:thejoker 作者: adrn 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        try:
            with ContextualZipFile(filename) as archive:
                archive.extractall()
        except zipfile.BadZipfile as err:
            if not err.args:
                err.args = ('', )
            err.args = err.args + (
                MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
            )
            raise

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
awslambda.py 文件源码 项目:AshsSDK 作者: thehappydinoa 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _should_contain_zip_content(value):
    if not isinstance(value, bytes):
        # If it's not bytes it's basically impossible for
        # this to be valid zip content, but we'll at least
        # still try to load the contents as a zip file
        # to be absolutely sure.
        value = value.encode('utf-8')
    fileobj = six.BytesIO(value)
    try:
        with closing(zipfile.ZipFile(fileobj)) as f:
            f.infolist()
    except zipfile.BadZipfile:
        raise ValueError(ERROR_MSG)
node_network_plugin.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _extract_zip(self, file_obj, path):
            try:
                with zipfile.ZipFile(file_obj) as archive:
                    archive.extractall(path)
            except zipfile.BadZipfile:
                raise self.BadArchive
hostsutil.py 文件源码 项目:huhamhire-hosts 作者: jiangsile 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self):
        """
        Initialize a new TUI session.

        * Load server list from a configuration file under working directory.
        * Try to load the hosts data file under working directory if it
          exists.

        .. note:: IF hosts data file does not exists correctly in current
            working directory, a warning message box would popup. And
            operations to change the hosts file on current system could be
            done only until a new data file has been downloaded.

        .. seealso:: :meth:`~tui.curses_d.CursesDaemon.session_daemon` method
            in :class:`~tui.curses_d.CursesDaemon`.

        .. seealso:: :meth:`~gui.hostsutil.HostsUtil.init_main` in
            :class:`~gui.hostsutil.HostsUtil` class.
        """
        super(HostsUtil, self).__init__()
        # Set mirrors
        self.settings[0][2] = CommonUtil.set_network("network.conf")
        # Read data file and set function list
        try:
            self.set_platform()
            RetrieveData.unpack()
            RetrieveData.connect_db()
            self.set_info()
            self.set_func_list()
        except IOError:
            self.messagebox("No data file found! Press F6 to get data file "
                            "first.", 1)
        except BadZipfile:
            self.messagebox("Incorrect Data file! Press F6 to get a new data "
                            "file first.", 1)
qdialog_d.py 文件源码 项目:huhamhire-hosts 作者: jiangsile 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def refresh_info(self, refresh=0):
        """
        Reload the data file information and show them on the main dialog. The
        information here includes both metadata and hosts module info from the
        data file.

        :param refresh: A flag indicating whether the information on main
            dialog needs to be reloaded or not. The value could be `0` or `1`.

                =======  =============
                refresh  operation
                =======  =============
                0        Do NOT reload
                1        Reload
                =======  =============

        :type refresh: int
        """
        if refresh and RetrieveData.conn is not None:
            RetrieveData.clear()
        try:
            RetrieveData.unpack()
            RetrieveData.connect_db()
            self.set_func_list(refresh)
            self.refresh_func_list()
            self.set_info()
        except (BadZipfile, IOError, OSError):
            self.warning_incorrect_datafile()
hostsutil.py 文件源码 项目:huhamhire-hosts 作者: jiangsile 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def init_main(self):
        """
        Set up the elements on the main dialog. Check the environment of
        current operating system and current session.

        * Load server list from a configuration file under working directory.
        * Try to load the hosts data file under working directory if it
          exists.

        .. note:: IF hosts data file does not exists correctly in current
            working directory, a warning message box would popup. And
            operations to change the hosts file on current system could be
            done only until a new data file has been downloaded.

        .. seealso:: Method :meth:`~tui.hostsutil.HostsUtil.__init__` in
            :class:`~tui.hostsutil.HostsUtil` class.
        """
        self.ui.SelectMirror.clear()
        self.set_version()
        # Set mirrors
        self.mirrors = CommonUtil.set_network("network.conf")
        self.set_mirrors()
        # Read data file and set function list
        try:
            RetrieveData.unpack()
            RetrieveData.connect_db()
            self.set_func_list(1)
            self.refresh_func_list()
            self.set_info()
        except IOError:
            self.warning_no_datafile()
        except BadZipfile:
            self.warning_incorrect_datafile()
        # Check if current session have root privileges
        self.check_writable()
        self.init_flag += 1
unarchive.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def files_in_archive(self, force_refresh=False):
        if self._files_in_archive and not force_refresh:
            return self._files_in_archive

        self._files_in_archive = []
        try:
            archive = ZipFile(self.src)
        except BadZipfile:
            e = get_exception()
            if e.args[0].lower().startswith('bad magic number'):
                # Python2.4 can't handle zipfiles with > 64K files.  Try using
                # /usr/bin/unzip instead
                self._legacy_file_list(force_refresh)
            else:
                raise
        else:
            try:
                for member in archive.namelist():
                    if member not in self.excludes:
                        self._files_in_archive.append(to_native(member))
            except:
                archive.close()
                raise UnarchiveError('Unable to list files in the archive')

            archive.close()
        return self._files_in_archive
appLoad.py 文件源码 项目:appcompatprocessor 作者: mbevilacqua 项目源码 文件源码 阅读 118 收藏 0 点赞 0 评论 0
def processArchives(filename, file_filter):
    # Process zip file if required and return a list of files to process
    files_to_process = []

    if filename.endswith('.zip'):
        try:
            zip_archive_filename = filename
            # Open the zip archive:
            zip_archive = zipfile.ZipFile(zip_archive_filename, "r")
            zipFileList = zip_archive.namelist()
            zip_archive.close()
            countTotalFiles = len(zipFileList)
            logger.info("Total files in %s: %d" % (zip_archive_filename, countTotalFiles))
            logger.info("Hold on while we check the zipped files...")

            for zipped_filename in zipFileList:
                if re.match(file_filter, zipped_filename):
                    files_to_process.append(os.path.join(zip_archive_filename, zipped_filename))
            if len(files_to_process) == 0:
                logger.error("No valid files found!")
        except (IOError, zipfile.BadZipfile, struct.error), err:
            logger.error("Error reading zip archive: %s" % zip_archive_filename)
            exit(-1)
    else:
        files_to_process.append(filename)
    return files_to_process
ez_setup.py 文件源码 项目:fiasco 作者: wtbarnes 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        try:
            with ContextualZipFile(filename) as archive:
                archive.extractall()
        except zipfile.BadZipfile as err:
            if not err.args:
                err.args = ('', )
            err.args = err.args + (
                MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
            )
            raise

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
test_zipfile.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_close_on_exception(self):
        """Check that the zipfile is closed if an exception is raised in the
        'with' block."""
        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
            for fpath, fdata in SMALL_TEST_DATA:
                zipfp.writestr(fpath, fdata)

        try:
            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
                raise zipfile.BadZipfile()
        except zipfile.BadZipfile:
            self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
test_zipfile.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_close_erroneous_file(self):
        # This test checks that the ZipFile constructor closes the file object
        # it opens if there's an error in the file.  If it doesn't, the
        # traceback holds a reference to the ZipFile object and, indirectly,
        # the file object.
        # On Windows, this causes the os.unlink() call to fail because the
        # underlying file is still open.  This is SF bug #412214.
        #
        with open(TESTFN, "w") as fp:
            fp.write("this is not a legal zip file\n")
        try:
            zf = zipfile.ZipFile(TESTFN)
        except zipfile.BadZipfile:
            pass
test_zipfile.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_damaged_zipfile(self):
        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
        # - Create a valid zip file
        fp = io.BytesIO()
        with zipfile.ZipFile(fp, mode="w") as zipf:
            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
        zipfiledata = fp.getvalue()

        # - Now create copies of it missing the last N bytes and make sure
        #   a BadZipFile exception is raised when we try to open it
        for N in range(len(zipfiledata)):
            fp = io.BytesIO(zipfiledata[:N])
            self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, fp)
test_zipfile.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_empty_file_raises_BadZipFile(self):
        with open(TESTFN, 'w') as f:
            pass
        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)

        with open(TESTFN, 'w') as fp:
            fp.write("short file")
        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
serverless.py 文件源码 项目:puresec-cli 作者: puresec 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_function_root(self, name):
        if not hasattr(self, 'functions_output'):
            self.functions_output = TemporaryDirectory("puresec-serverless-functions-")

        package_name = self._get_function_package_name(name)
        function_root = os.path.join(self.functions_output.name, package_name)
        if os.path.exists(function_root):
            return function_root

        try:
            zipfile = ZipFile(os.path.join(self.serverless_package, "{}.zip".format(package_name)), 'r')
        except FileNotFoundError:
            eprint("error: serverless package did not create a function zip for '{}'", name)
            raise SystemExit(2)
        except BadZipFile:
            eprint("error: serverless package did not create a valid function zip for '{}'", name)
            raise SystemExit(2)

        with zipfile:
            zipfile.extractall(function_root)
        return function_root
forms.py 文件源码 项目:DCRM 作者: 82Flex 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def clean_zip_file(self):
        """Open the zip file a first time, to check that it is a valid zip archive.
        We'll open it again in a moment, so we have some duplication, but let's focus
        on keeping the code easier to read!
        """
        zip_file = self.cleaned_data['zip_file']
        try:
            zip = zipfile.ZipFile(zip_file)
        except BadZipFile as e:
            raise forms.ValidationError(str(e))
        bad_file = zip.testzip()
        if bad_file:
            zip.close()
            raise forms.ValidationError('"%s" in the .zip archive is corrupt.' % bad_file)
        zip.close()  # Close file in all cases.
        return zip_file
forms.py 文件源码 项目:ava-website 作者: ava-project 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_manifest(self, archive):
        try:
            with ZipFile(archive.temporary_file_path()) as plugin:
                print(plugin.namelist())
                prefix = self.get_prefix(plugin)
                prefix = prefix + '/' if len(prefix) else ''
                with plugin.open('{}manifest.json'.format(prefix)) as myfile:
                    manifest = json.loads(myfile.read())
                validate_manifest(manifest)
                return manifest
        except BadZipFile:
            raise ValidationError('Bad .zip format')
        except FileNotFoundError:
            raise ValidationError('Error with upload, please try again')
        except KeyError:
            raise ValidationError('No manifest.json found in archive')
        except json.JSONDecodeError:
            raise ValidationError('Error with manifest.json, bad Json Format')
        except avasdk.exceptions.ValidationError as e:
            raise ValidationError('Error in manifest.json ({})'.format(e))
utilities.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_zip_filesize(filepath: str) -> int:
    try:
        my_zip = zipfile.ZipFile(filepath, 'r')
    except zipfile.BadZipFile:
        return -1

    info_list = my_zip.infolist()
    total_size = 0

    for info in info_list:
        if not info.filename.lower().endswith(
            ('.jpeg', '.jpg', '.png', '.gif')
        ):
            continue
        if '__macosx' in info.filename.lower():
            continue
        total_size += int(info.file_size)

    my_zip.close()

    return total_size
utilities.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_zip_fileinfo(filepath: str) -> Tuple[int, int]:
    try:
        my_zip = zipfile.ZipFile(filepath, 'r')
    except zipfile.BadZipFile:
        return -1, -1

    total_size = 0
    total_count = 0

    for info in my_zip.infolist():
        if not info.filename.lower().endswith(
                ('.jpeg', '.jpg', '.png', '.gif')
        ):
            continue
        if '__macosx' in info.filename.lower():
            continue
        total_size += int(info.file_size)
        total_count += 1

    my_zip.close()

    return total_size, total_count
build.py 文件源码 项目:digger-build-cli 作者: aerogear 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def from_zip(cls, src='/tmp/app.zip', dest='/app'):
    """
    Unzips a zipped app project file and instantiates it.

    :param src: zipfile path
    :param dest: destination folder to extract the zipfile content

    Returns
      A project instance.
    """
    try:
      zf = zipfile.ZipFile(src, 'r')
    except FileNotFoundError:
      raise errors.InvalidPathError(src)
    except zipfile.BadZipFile:
      raise errors.InvalidZipFileError(src)
    [zf.extract(file, dest) for file in zf.namelist()]
    zf.close()
    return cls.from_path(dest)
test_zipfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def check_read_with_bad_crc(self, compression):
        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
        zipdata = self.zips_with_bad_crc[compression]

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipFile):
                    while corrupt_file.read(2):
                        pass
test_zipfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_empty_zipfile(self):
        # Check that creating a file in 'w' or 'a' mode and closing without
        # adding any files to the archives creates a valid empty ZIP file
        zipf = zipfile.ZipFile(TESTFN, mode="w")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except zipfile.BadZipFile:
            self.fail("Unable to create empty ZIP file in 'w' mode")

        zipf = zipfile.ZipFile(TESTFN, mode="a")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except:
            self.fail("Unable to create empty ZIP file in 'a' mode")
test_zipfile.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_empty_zipfile(self):
        # Check that creating a file in 'w' or 'a' mode and closing without
        # adding any files to the archives creates a valid empty ZIP file
        zipf = zipfile.ZipFile(TESTFN, mode="w")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except zipfile.BadZipFile:
            self.fail("Unable to create empty ZIP file in 'w' mode")

        zipf = zipfile.ZipFile(TESTFN, mode="a")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except:
            self.fail("Unable to create empty ZIP file in 'a' mode")
test_zipfile.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_read_with_bad_crc(self):
        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
        zipdata = self.zip_with_bad_crc

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipFile):
                    while corrupt_file.read(2):
                        pass


问题


面经


文章

微信
公众号

扫码关注公众号