def makeZipFiles(self):
self._makebigfile('bigfile.zip', zipfile.ZIP_STORED)
zf2=zipfile.ZipFile('littlefiles.zip', 'w')
try:
os.mkdir('zipstreamdir')
except EnvironmentError:
pass
for i in range(1000):
fn='zipstreamdir/%d' % i
stuff(fn, '')
zf2.write(fn)
zf2.close()
self._makebigfile('bigfile_deflated.zip', zipfile.ZIP_DEFLATED)
self.cleanupUnzippedJunk()
python类ZIP_STORED的实例源码
def write_empty_dir(self, arcname, time_spec=TimeSpec.NOW, comment=None):
"""Explicitly add an empty directory entry to the archive"""
self._checkzfile()
arcname = self.cleanup_name(arcname, True)
arcname_enc, arcname_is_utf8 = self.encode_name(arcname)
comment_is_utf8 = False
zinfo = zipfile.ZipInfo(arcname_enc)
#zinfo.filename = arcname_enc
zinfo.date_time = self.timespec_to_zipinfo(time_spec)
zinfo.compress_type = zipfile.ZIP_STORED
zinfo.external_attr = 0o40775 << 16 # unix attributes drwxr-xr-x
zinfo.external_attr |= 0x10 # MS-DOS directory flag
if comment is not None:
zinfo.comment, comment_is_utf8 = self.encode_name(comment)
if arcname_is_utf8 or comment_is_utf8:
zinfo.flag_bits |= 1 << 11
self.zfile.writestr(zinfo, b'')
def DoZip(inputs, output, base_dir=None, compress_fn=None):
"""Creates a zip file from a list of files.
Args:
inputs: A list of paths to zip, or a list of (zip_path, fs_path) tuples.
output: Destination .zip file.
base_dir: Prefix to strip from inputs.
compress_fn: Applied to each input to determine whether or not to compress.
By default, items will be |zipfile.ZIP_STORED|.
"""
input_tuples = []
for tup in inputs:
if isinstance(tup, basestring):
tup = (os.path.relpath(tup, base_dir), tup)
input_tuples.append(tup)
# Sort by zip path to ensure stable zip ordering.
input_tuples.sort(key=lambda tup: tup[0])
with zipfile.ZipFile(output, 'w') as outfile:
for zip_path, fs_path in input_tuples:
compress = compress_fn(zip_path) if compress_fn else None
AddToZipHermetic(outfile, zip_path, src_path=fs_path, compress=compress)
def DoMain(argv):
usage = 'usage: %prog [options] [output_dir] input_file(s)...'
parser = optparse.OptionParser(usage=usage)
build_utils.AddDepfileOption(parser)
parser.add_option('--srcjar',
help='When specified, a .srcjar at the given path is '
'created instead of individual .java files.')
options, args = parser.parse_args(argv)
if not args:
parser.error('Need to specify at least one input file')
input_paths = args
with zipfile.ZipFile(options.srcjar, 'w', zipfile.ZIP_STORED) as srcjar:
for output_path, data in DoGenerate(input_paths):
build_utils.AddToZipHermetic(srcjar, output_path, data=data)
if options.depfile:
build_utils.WriteDepfile(options.depfile, options.srcjar)
def assemble_my_parts(self):
"""Assemble the `self.parts` dictionary. Extend in subclasses.
"""
writers.Writer.assemble_parts(self)
f = tempfile.NamedTemporaryFile()
zfile = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED)
self.write_zip_str(zfile, 'mimetype', self.MIME_TYPE,
compress_type=zipfile.ZIP_STORED)
content = self.visitor.content_astext()
self.write_zip_str(zfile, 'content.xml', content)
s1 = self.create_manifest()
self.write_zip_str(zfile, 'META-INF/manifest.xml', s1)
s1 = self.create_meta()
self.write_zip_str(zfile, 'meta.xml', s1)
s1 = self.get_stylesheet()
self.write_zip_str(zfile, 'styles.xml', s1)
self.store_embedded_files(zfile)
self.copy_from_stylesheet(zfile)
zfile.close()
f.seek(0)
whole = f.read()
f.close()
self.parts['whole'] = whole
self.parts['encoding'] = self.document.settings.output_encoding
self.parts['version'] = docutils.__version__
def _create_function(self, name='python_test.py'):
python_file_path = pkg_resources.resource_filename(
'qinling_tempest_plugin',
"functions/%s" % name
)
base_name, extention = os.path.splitext(python_file_path)
module_name = os.path.basename(base_name)
self.python_zip_file = os.path.join(
tempfile.gettempdir(),
'%s.zip' % module_name
)
if not os.path.isfile(self.python_zip_file):
zf = zipfile.ZipFile(self.python_zip_file, mode='w')
try:
# Use default compression mode, may change in future.
zf.write(
python_file_path,
'%s%s' % (module_name, extention),
compress_type=zipfile.ZIP_STORED
)
finally:
zf.close()
self.function_id = self.create_function(self.python_zip_file)
def DoMain(argv):
usage = 'usage: %prog [options] [output_dir] input_file(s)...'
parser = optparse.OptionParser(usage=usage)
build_utils.AddDepfileOption(parser)
parser.add_option('--srcjar',
help='When specified, a .srcjar at the given path is '
'created instead of individual .java files.')
options, args = parser.parse_args(argv)
if not args:
parser.error('Need to specify at least one input file')
input_paths = args
with zipfile.ZipFile(options.srcjar, 'w', zipfile.ZIP_STORED) as srcjar:
for output_path, data in DoGenerate(input_paths):
build_utils.AddToZipHermetic(srcjar, output_path, data=data)
if options.depfile:
build_utils.WriteDepfile(options.depfile, options.srcjar)
def test_open_via_zip_info(self):
# Create the ZIP archive
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.writestr("name", "foo")
zipfp.writestr("name", "bar")
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
infos = zipfp.infolist()
data = b""
for info in infos:
with zipfp.open(info) as zipopen:
data += zipopen.read()
self.assertTrue(data == b"foobar" or data == b"barfoo")
data = b""
for info in infos:
data += zipfp.read(info)
self.assertTrue(data == b"foobar" or data == b"barfoo")
def test_univeral_readaheads(self):
f = io.BytesIO()
data = b'a\r\n' * 16 * 1024
zipfp = zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED)
zipfp.writestr(TESTFN, data)
zipfp.close()
data2 = b''
zipfp = zipfile.ZipFile(f, 'r')
with zipfp.open(TESTFN, 'rU') as zipopen:
for line in zipopen:
data2 += line
zipfp.close()
self.assertEqual(data, data2.replace(b'\n', b'\r\n'))
def test_extract_all(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
zipfp.extractall()
for fpath, fdata in SMALL_TEST_DATA:
if os.path.isabs(fpath):
outfile = os.path.join(os.getcwd(), fpath[1:])
else:
outfile = os.path.join(os.getcwd(), fpath)
with open(outfile, "rb") as f:
self.assertEqual(fdata.encode(), f.read())
os.remove(outfile)
# remove the test file subdirectories
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def save(self, filename):
"""Save book to a file."""
if pathlib.Path(filename).exists():
raise FileExistsError
self._write_spine()
self._write('container.xml', 'META-INF/container.xml')
self._write_toc()
with open(str(self.path / 'mimetype'), 'w') as file:
file.write('application/epub+zip')
with zipfile.ZipFile(filename, 'w') as archive:
archive.write(
str(self.path / 'mimetype'), 'mimetype',
compress_type=zipfile.ZIP_STORED)
for file in self.path.rglob('*.*'):
archive.write(
str(file), str(file.relative_to(self.path)),
compress_type=zipfile.ZIP_DEFLATED)
###########################################################################
# Private Methods
###########################################################################
def DoMain(argv):
usage = 'usage: %prog [options] [output_dir] input_file(s)...'
parser = optparse.OptionParser(usage=usage)
build_utils.AddDepfileOption(parser)
parser.add_option('--srcjar',
help='When specified, a .srcjar at the given path is '
'created instead of individual .java files.')
options, args = parser.parse_args(argv)
if not args:
parser.error('Need to specify at least one input file')
input_paths = args
with zipfile.ZipFile(options.srcjar, 'w', zipfile.ZIP_STORED) as srcjar:
for output_path, data in DoGenerate(input_paths):
build_utils.AddToZipHermetic(srcjar, output_path, data=data)
if options.depfile:
build_utils.WriteDepfile(options.depfile, options.srcjar)
def test_open_via_zip_info(self):
# Create the ZIP archive
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.writestr("name", "foo")
with check_warnings(('', UserWarning)):
zipfp.writestr("name", "bar")
self.assertEqual(zipfp.namelist(), ["name"] * 2)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
infos = zipfp.infolist()
data = ""
for info in infos:
with zipfp.open(info) as f:
data += f.read()
self.assertTrue(data == "foobar" or data == "barfoo")
data = ""
for info in infos:
data += zipfp.read(info)
self.assertTrue(data == "foobar" or data == "barfoo")
def test_append_to_non_zip_file(self):
"""Test appending to an existing file that is not a zipfile."""
# NOTE: this test fails if len(d) < 22 because of the first
# line "fpin.seek(-22, 2)" in _EndRecData
data = 'I am not a ZipFile!'*10
with open(TESTFN2, 'wb') as f:
f.write(data)
with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, TESTFN)
with open(TESTFN2, 'rb') as f:
f.seek(len(data))
with zipfile.ZipFile(f, "r") as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN])
self.assertEqual(zipfp.read(TESTFN), self.data)
with open(TESTFN2, 'rb') as f:
self.assertEqual(f.read(len(data)), data)
zipfiledata = f.read()
with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN])
self.assertEqual(zipfp.read(TESTFN), self.data)
def test_append_to_concatenated_zip_file(self):
with io.BytesIO() as bio:
with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, TESTFN)
zipfiledata = bio.getvalue()
data = b'I am not a ZipFile!'*1000000
with open(TESTFN2, 'wb') as f:
f.write(data)
f.write(zipfiledata)
with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN])
zipfp.writestr('strfile', self.data)
with open(TESTFN2, 'rb') as f:
self.assertEqual(f.read(len(data)), data)
zipfiledata = f.read()
with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
self.assertEqual(zipfp.read(TESTFN), self.data)
self.assertEqual(zipfp.read('strfile'), self.data)
def test_extract(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
writtenfile = zipfp.extract(fpath)
# make sure it was written to the right place
correctfile = os.path.join(os.getcwd(), fpath)
correctfile = os.path.normpath(correctfile)
self.assertEqual(writtenfile, correctfile)
# make sure correct data is in correct file
with open(writtenfile, "rb") as fid:
self.assertEqual(fdata, fid.read())
os.remove(writtenfile)
# remove the test file subdirectories
rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def test_extract_all(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
zipfp.extractall()
for fpath, fdata in SMALL_TEST_DATA:
outfile = os.path.join(os.getcwd(), fpath)
with open(outfile, "rb") as fid:
self.assertEqual(fdata, fid.read())
os.remove(outfile)
# remove the test file subdirectories
rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def test_extract_unicode_filenames(self):
fnames = [u'foo.txt', os.path.basename(TESTFN_UNICODE)]
content = 'Test for unicode filename'
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fname in fnames:
zipfp.writestr(fname, content)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
for fname in fnames:
writtenfile = zipfp.extract(fname)
# make sure it was written to the right place
correctfile = os.path.join(os.getcwd(), fname)
correctfile = os.path.normpath(correctfile)
self.assertEqual(writtenfile, correctfile)
self.check_file(writtenfile, content)
os.remove(writtenfile)
def test_open_via_zip_info(self):
# Create the ZIP archive
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.writestr("name", "foo")
with check_warnings(('', UserWarning)):
zipfp.writestr("name", "bar")
self.assertEqual(zipfp.namelist(), ["name"] * 2)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
infos = zipfp.infolist()
data = ""
for info in infos:
with zipfp.open(info) as f:
data += f.read()
self.assertTrue(data == "foobar" or data == "barfoo")
data = ""
for info in infos:
data += zipfp.read(info)
self.assertTrue(data == "foobar" or data == "barfoo")
def test_append_to_non_zip_file(self):
"""Test appending to an existing file that is not a zipfile."""
# NOTE: this test fails if len(d) < 22 because of the first
# line "fpin.seek(-22, 2)" in _EndRecData
data = 'I am not a ZipFile!'*10
with open(TESTFN2, 'wb') as f:
f.write(data)
with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, TESTFN)
with open(TESTFN2, 'rb') as f:
f.seek(len(data))
with zipfile.ZipFile(f, "r") as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN])
self.assertEqual(zipfp.read(TESTFN), self.data)
with open(TESTFN2, 'rb') as f:
self.assertEqual(f.read(len(data)), data)
zipfiledata = f.read()
with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN])
self.assertEqual(zipfp.read(TESTFN), self.data)
def test_append_to_concatenated_zip_file(self):
with io.BytesIO() as bio:
with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, TESTFN)
zipfiledata = bio.getvalue()
data = b'I am not a ZipFile!'*1000000
with open(TESTFN2, 'wb') as f:
f.write(data)
f.write(zipfiledata)
with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN])
zipfp.writestr('strfile', self.data)
with open(TESTFN2, 'rb') as f:
self.assertEqual(f.read(len(data)), data)
zipfiledata = f.read()
with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
self.assertEqual(zipfp.read(TESTFN), self.data)
self.assertEqual(zipfp.read('strfile'), self.data)
def test_extract(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
writtenfile = zipfp.extract(fpath)
# make sure it was written to the right place
correctfile = os.path.join(os.getcwd(), fpath)
correctfile = os.path.normpath(correctfile)
self.assertEqual(writtenfile, correctfile)
# make sure correct data is in correct file
with open(writtenfile, "rb") as fid:
self.assertEqual(fdata, fid.read())
os.remove(writtenfile)
# remove the test file subdirectories
rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def test_extract_all(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
zipfp.extractall()
for fpath, fdata in SMALL_TEST_DATA:
outfile = os.path.join(os.getcwd(), fpath)
with open(outfile, "rb") as fid:
self.assertEqual(fdata, fid.read())
os.remove(outfile)
# remove the test file subdirectories
rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def test_extract_unicode_filenames(self):
fnames = [u'foo.txt', os.path.basename(TESTFN_UNICODE)]
content = 'Test for unicode filename'
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fname in fnames:
zipfp.writestr(fname, content)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
for fname in fnames:
writtenfile = zipfp.extract(fname)
# make sure it was written to the right place
correctfile = os.path.join(os.getcwd(), fname)
correctfile = os.path.normpath(correctfile)
self.assertEqual(writtenfile, correctfile)
self.check_file(writtenfile, content)
os.remove(writtenfile)
def __init__(self, file, mode="r",
compression=zipfile.ZIP_STORED,
allowZip64=False):
zipfile.ZipFile.__init__(self, file, mode, compression, allowZip64)
self.strict = False
self._expected_hashes = {}
self._hash_algorithm = hashlib.sha256
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
mode='w'):
"""Create a zip file from all the files under 'base_dir'. The output
zip file will be named 'base_dir' + ".zip". Uses either the "zipfile"
Python module (if available) or the InfoZIP "zip" utility (if installed
and found on the default search path). If neither tool is available,
raises DistutilsExecError. Returns the name of the output zip file.
"""
import zipfile
mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
def visit(z, dirname, names):
for name in names:
path = os.path.normpath(os.path.join(dirname, name))
if os.path.isfile(path):
p = path[len(base_dir) + 1:]
if not dry_run:
z.write(path, p)
log.debug("adding '%s'", p)
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
if not dry_run:
z = zipfile.ZipFile(zip_filename, mode, compression=compression)
for dirname, dirs, files in os.walk(base_dir):
visit(z, dirname, files)
z.close()
else:
for dirname, dirs, files in os.walk(base_dir):
visit(None, dirname, files)
return zip_filename
def __init__(self, file, mode="r",
compression=zipfile.ZIP_STORED,
allowZip64=False):
zipfile.ZipFile.__init__(self, file, mode, compression, allowZip64)
self.strict = False
self._expected_hashes = {}
self._hash_algorithm = hashlib.sha256
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
mode='w'):
"""Create a zip file from all the files under 'base_dir'. The output
zip file will be named 'base_dir' + ".zip". Uses either the "zipfile"
Python module (if available) or the InfoZIP "zip" utility (if installed
and found on the default search path). If neither tool is available,
raises DistutilsExecError. Returns the name of the output zip file.
"""
import zipfile
mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
def visit(z, dirname, names):
for name in names:
path = os.path.normpath(os.path.join(dirname, name))
if os.path.isfile(path):
p = path[len(base_dir) + 1:]
if not dry_run:
z.write(path, p)
log.debug("adding '%s'", p)
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
if not dry_run:
z = zipfile.ZipFile(zip_filename, mode, compression=compression)
for dirname, dirs, files in sorted_walk(base_dir):
visit(z, dirname, files)
z.close()
else:
for dirname, dirs, files in sorted_walk(base_dir):
visit(None, dirname, files)
return zip_filename
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=None,
mode='w'
):
"""Create a zip file from all the files under 'base_dir'. The output
zip file will be named 'base_dir' + ".zip". Uses either the "zipfile"
Python module (if available) or the InfoZIP "zip" utility (if installed
and found on the default search path). If neither tool is available,
raises DistutilsExecError. Returns the name of the output zip file.
"""
import zipfile
mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
def visit(z, dirname, names):
for name in names:
path = os.path.normpath(os.path.join(dirname, name))
if os.path.isfile(path):
p = path[len(base_dir)+1:]
if not dry_run:
z.write(path, p)
log.debug("adding '%s'" % p)
if compress is None:
compress = (sys.version>="2.4") # avoid 2.3 zipimport bug when 64 bits
compression = [zipfile.ZIP_STORED, zipfile.ZIP_DEFLATED][bool(compress)]
if not dry_run:
z = zipfile.ZipFile(zip_filename, mode, compression=compression)
for dirname, dirs, files in os.walk(base_dir):
visit(z, dirname, files)
z.close()
else:
for dirname, dirs, files in os.walk(base_dir):
visit(None, dirname, files)
return zip_filename
#
def _zip_factory(self, filename):
"""create a ZipFile Object with compression and allow big ZIP files (allowZip64)"""
try:
import zlib
assert zlib
zip_compression= zipfile.ZIP_DEFLATED
except Exception as ex:
zip_compression= zipfile.ZIP_STORED
_zip = zipfile.ZipFile(file=filename, mode='w', compression=zip_compression, allowZip64=True)
return _zip