python类ZipFile()的实例源码

tactic_server_stub.py 文件源码 项目:TACTIC-Handler 作者: listyque 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_client_version(my):
        '''API Function: get_client_version()

        @return:
            string - Version of TACTIC that this client came from'''

        # may use pkg_resources in 2.6
        if '.zip' in __file__:
            import zipfile
            parts = __file__.split('.zip')
            zip_name  = '%s.zip'%parts[0]
            if zipfile.is_zipfile(zip_name):
                z = zipfile.ZipFile(zip_name)
                version = z.read('pyasm/application/common/interpreter/tactic_client_lib/VERSION')
                version = version.strip()
                z.close()
        else:
            dir = os.path.dirname(__file__)
            f = open('%s/VERSION' % dir, 'r')
            version = f.readline().strip()
            f.close()
        return version
tactic_server_stub.py 文件源码 项目:TACTIC-Handler 作者: listyque 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_client_api_version(my):
        '''API Function: get_client_api_version()

        @return:
            string - client api version'''

        # may use pkg_resources in 2.6
        if '.zip' in __file__:
            import zipfile
            parts = __file__.split('.zip')
            zip_name  = '%s.zip'%parts[0]
            if zipfile.is_zipfile(zip_name):
                z = zipfile.ZipFile(zip_name)
                version = z.read('pyasm/application/common/interpreter/tactic_client_lib/VERSION_API')
                version = version.strip()
                z.close()
        else:
            dir = os.path.dirname(__file__)
            f = open('%s/VERSION_API' % dir, 'r')
            version = f.readline().strip()
            f.close()
        return version
test_web.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_export_result_csv_no_tasks_returns_empty_file(self):
        """Test WEB export Result to CSV returns empty file if no results in
        project."""
        project = ProjectFactory.create(short_name='no_tasks_here')
        uri = "/project/%s/tasks/export?type=result&format=csv" % project.short_name
        res = self.app.get(uri, follow_redirects=True)
        zip = zipfile.ZipFile(StringIO(res.data))
        extracted_filename = zip.namelist()[0]

        csv_content = StringIO(zip.read(extracted_filename))
        csvreader = unicode_csv_reader(csv_content)
        is_empty = True
        for line in csvreader:
            is_empty = False, line

        assert is_empty
test_basic.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        with open(filename, 'rb') as json_file:
            return json.loads(json_file.read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
test_wheelfile.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_zipfile_attributes():
    # With the change from ZipFile.write() to .writestr(), we need to manually
    # set member attributes.
    with temporary_directory() as tempdir:
        files = (('foo', 0o644), ('bar', 0o755))
        for filename, mode in files:
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
            os.chmod(path, mode)
        zip_base_name = os.path.join(tempdir, 'dummy')
        zip_filename = wheel.archive.make_wheelfile_inner(
            zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for filename, mode in files:
                info = zf.getinfo(os.path.join(tempdir, filename))
                assert info.external_attr == (mode | 0o100000) << 16
                assert info.compress_type == zipfile.ZIP_DEFLATED
__init__.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def init_zoneinfo():
    """
    Add each zone info to the datastore. This will overwrite existing zones.

    This must be called before the AppengineTimezoneLoader will work.
    """
    import os, logging
    from zipfile import ZipFile
    zoneobjs = []

    zoneinfo_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
      'zoneinfo.zip'))

    with ZipFile(zoneinfo_path) as zf:
        for zfi in zf.filelist:
            key = ndb.Key('Zoneinfo', zfi.filename, namespace=NDB_NAMESPACE)
            zobj = Zoneinfo(key=key, data=zf.read(zfi))
            zoneobjs.append(zobj)

    logging.info("Adding %d timezones to the pytz-appengine database" %
        len(zoneobjs)
        )

    ndb.put_multi(zoneobjs)
appliances.py 文件源码 项目:gns3-documentation-template 作者: GNS3 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_appliances():
    """
    This will download an archive of all GNS3 appliances and load them to a dictionnary
    """
    appliances = {}

    url = 'https://github.com/GNS3/gns3-registry/archive/master.zip'
    response = urllib.request.urlopen(url)
    z = zipfile.ZipFile(io.BytesIO(response.read()))
    for path in z.namelist():
        if path.endswith('.gns3a'):
            with z.open(path, 'r') as f:
                id = os.path.basename(path).split('.')[0]
                appliance = json.loads(f.read().decode())
                if appliance["status"] != "broken":
                    appliances[id] = appliance
    return appliances
__init__.py 文件源码 项目:gns3-documentation-template 作者: GNS3 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_appliances():
    """
    This will download an archive of all GNS3 appliances and load them to a dictionnary
    """
    appliances = {}

    url = 'https://github.com/GNS3/gns3-registry/archive/master.zip'
    response = urllib.request.urlopen(url)
    z = zipfile.ZipFile(io.BytesIO(response.read()))
    for path in z.namelist():
        if path.endswith('.gns3a'):
            with z.open(path, 'r') as f:
                id = os.path.basename(path).split('.')[0]
                appliance = json.loads(f.read().decode())
                if appliance["status"] != "broken":
                    appliances[id] = appliance
    return appliances
test_basic.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
upload_docs.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def create_zipfile(self, filename):
        zip_file = zipfile.ZipFile(filename, "w")
        try:
            self.mkpath(self.target_dir)  # just in case
            for root, dirs, files in os.walk(self.target_dir):
                if root == self.target_dir and not files:
                    raise DistutilsOptionError(
                        "no files found in upload directory '%s'"
                        % self.target_dir)
                for name in files:
                    full = os.path.join(root, name)
                    relative = root[len(self.target_dir):].lstrip(os.path.sep)
                    dest = os.path.join(relative, name)
                    zip_file.write(full, dest)
        finally:
            zip_file.close()
archive.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def extract_zipfile(archive_name, destpath):
    "Unpack a zip file"
    archive = zipfile.ZipFile(archive_name)
    archive.extractall(destpath)
ez_setup.py 文件源码 项目:Adafruit_Python_PureIO 作者: adafruit 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_zip_class():
    """
    Supplement ZipFile class to support context manager for Python 2.6
    """
    class ContextualZipFile(zipfile.ZipFile):
        def __enter__(self):
            return self
        def __exit__(self, type, value, traceback):
            self.close
    return zipfile.ZipFile if hasattr(zipfile.ZipFile, '__exit__') else \
        ContextualZipFile
prepare.py 文件源码 项目:almond-nnparser 作者: Stanford-Mobisocial-IoT-Lab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def download_glove(glove):
    if os.path.exists(glove):
        return

    print('Downloading glove...')
    with tempfile.TemporaryFile() as tmp:
        with urllib.request.urlopen('http://nlp.stanford.edu/data/glove.42B.300d.zip') as res:
            shutil.copyfileobj(res, tmp)
        with zipfile.ZipFile(tmp, 'r') as glove_zip:
            glove_zip.extract('glove.42B.300d.txt', path=os.path.dirname(glove))
    print('Done')
shutil.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _unpack_zipfile(filename, extract_dir):
    """Unpack zip `filename` to `extract_dir`
    """
    try:
        import zipfile
    except ImportError:
        raise ReadError('zlib not supported, cannot unpack this archive.')

    if not zipfile.is_zipfile(filename):
        raise ReadError("%s is not a zip file" % filename)

    zip = zipfile.ZipFile(filename)
    try:
        for info in zip.infolist():
            name = info.filename

            # don't extract absolute paths or ones with .. in them
            if name.startswith('/') or '..' in name:
                continue

            target = os.path.join(extract_dir, *name.split('/'))
            if not target:
                continue

            _ensure_directory(target)
            if not name.endswith('/'):
                # file
                data = zip.read(info.filename)
                f = open(target, 'wb')
                try:
                    f.write(data)
                finally:
                    f.close()
                    del data
    finally:
        zip.close()
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __new__(cls, *args, **kwargs):
        """
        Construct a ZipFile or ContextualZipFile as appropriate
        """
        if hasattr(zipfile.ZipFile, '__exit__'):
            return zipfile.ZipFile(*args, **kwargs)
        return super(ContextualZipFile, cls).__new__(cls)
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def unzip_file(filename, location, flatten=True):
    """
    Unzip the file (with path `filename`) to the destination `location`.  All
    files are written based on system defaults and umask (i.e. permissions are
    not preserved), except that regular file members with any execute
    permissions (user, group, or world) have "chmod +x" applied after being
    written. Note that for windows, any execute changes using os.chmod are
    no-ops per the python docs.
    """
    ensure_dir(location)
    zipfp = open(filename, 'rb')
    try:
        zip = zipfile.ZipFile(zipfp, allowZip64=True)
        leading = has_leading_dir(zip.namelist()) and flatten
        for info in zip.infolist():
            name = info.filename
            data = zip.read(name)
            fn = name
            if leading:
                fn = split_leading_dir(name)[1]
            fn = os.path.join(location, fn)
            dir = os.path.dirname(fn)
            if fn.endswith('/') or fn.endswith('\\'):
                # A directory
                ensure_dir(fn)
            else:
                ensure_dir(dir)
                fp = open(fn, 'wb')
                try:
                    fp.write(data)
                finally:
                    fp.close()
                    mode = info.external_attr >> 16
                    # if mode and regular file and any execute permissions for
                    # user/group/world?
                    if mode and stat.S_ISREG(mode) and mode & 0o111:
                        # make dest file have execute for user/group/world
                        # (chmod +x) no-op on windows per python docs
                        os.chmod(fn, (0o777 - current_umask() | 0o111))
    finally:
        zipfp.close()
test_wheelfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def readable_zipfile(path):
    # zipfile.ZipFile() isn't a context manager under Python 2.
    zf = zipfile.ZipFile(path, 'r')
    try:
        yield zf
    finally:
        zf.close()
test_wheelfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_verifying_zipfile():
    if not hasattr(zipfile.ZipExtFile, '_update_crc'):
        pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.')

    sio = StringIO()
    zf = zipfile.ZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.writestr("three", b"third file")
    zf.close()

    # In default mode, VerifyingZipFile checks the hash of any read file
    # mentioned with set_expected_hash(). Files not mentioned with
    # set_expected_hash() are not checked.
    vzf = wheel.install.VerifyingZipFile(sio, 'r')
    vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest())
    vzf.set_expected_hash("three", "blurble")
    vzf.open("one").read()
    vzf.open("two").read()
    try:
        vzf.open("three").read()
    except wheel.install.BadWheelFile:
        pass
    else:
        raise Exception("expected exception 'BadWheelFile()'")

    # In strict mode, VerifyingZipFile requires every read file to be
    # mentioned with set_expected_hash().
    vzf.strict = True
    try:
        vzf.open("two").read()
    except wheel.install.BadWheelFile:
        pass
    else:
        raise Exception("expected exception 'BadWheelFile()'")

    vzf.set_expected_hash("two", None)
    vzf.open("two").read()
install.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def open(self, name_or_info, mode="r", pwd=None):
        """Return file-like object for 'name'."""
        # A non-monkey-patched version would contain most of zipfile.py
        ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
        if isinstance(name_or_info, zipfile.ZipInfo):
            name = name_or_info.filename
        else:
            name = name_or_info
        if (name in self._expected_hashes
            and self._expected_hashes[name] != None):
            expected_hash = self._expected_hashes[name]
            try:
                _update_crc_orig = ef._update_crc
            except AttributeError:
                warnings.warn('Need ZipExtFile._update_crc to implement '
                              'file hash verification (in Python >= 2.7)')
                return ef
            running_hash = self._hash_algorithm()
            if hasattr(ef, '_eof'):  # py33
                def _update_crc(data):
                    _update_crc_orig(data)
                    running_hash.update(data)
                    if ef._eof and running_hash.digest() != expected_hash:
                        raise BadWheelFile("Bad hash for file %r" % ef.name)
            else:
                def _update_crc(data, eof=None):
                    _update_crc_orig(data, eof=eof)
                    running_hash.update(data)
                    if eof and running_hash.digest() != expected_hash:
                        raise BadWheelFile("Bad hash for file %r" % ef.name)
            ef._update_crc = _update_crc
        elif self.strict and name not in self._expected_hashes:
            raise BadWheelFile("No expected hash for file %r" % ef.name)
        return ef


问题


面经


文章

微信
公众号

扫码关注公众号