def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
mode='w'):
"""Create a zip file from all the files under 'base_dir'. The output
zip file will be named 'base_dir' + ".zip". Uses either the "zipfile"
Python module (if available) or the InfoZIP "zip" utility (if installed
and found on the default search path). If neither tool is available,
raises DistutilsExecError. Returns the name of the output zip file.
"""
import zipfile
mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
def visit(z, dirname, names):
for name in names:
path = os.path.normpath(os.path.join(dirname, name))
if os.path.isfile(path):
p = path[len(base_dir) + 1:]
if not dry_run:
z.write(path, p)
log.debug("adding '%s'", p)
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
if not dry_run:
z = zipfile.ZipFile(zip_filename, mode, compression=compression)
for dirname, dirs, files in sorted_walk(base_dir):
visit(z, dirname, files)
z.close()
else:
for dirname, dirs, files in sorted_walk(base_dir):
visit(None, dirname, files)
return zip_filename
python类ZIP_DEFLATED的实例源码
def make_archive(self, path):
"""Create archive of directory and write to ``path``.
:param path: Path to archive
Ignored::
* build/\* - This is used for packing the charm itself and any
similar tasks.
* \*/.\* - Hidden files are all ignored for now. This will most
likely be changed into a specific ignore list
(.bzr, etc)
"""
zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
for dirpath, dirnames, filenames in os.walk(self.path):
relative_path = dirpath[len(self.path) + 1:]
if relative_path and not self._ignore(relative_path):
zf.write(dirpath, relative_path)
for name in filenames:
archive_name = os.path.join(relative_path, name)
if not self._ignore(archive_name):
real_path = os.path.join(dirpath, name)
self._check_type(real_path)
if os.path.islink(real_path):
self._check_link(real_path)
self._write_symlink(
zf, os.readlink(real_path), archive_name)
else:
zf.write(real_path, archive_name)
zf.close()
return path
def load(path, num_cpu=16):
with open(path, "rb") as f:
model_data, act_params = dill.load(f)
act = deepq.build_act(**act_params)
sess = U.make_session(num_cpu=num_cpu)
sess.__enter__()
with tempfile.TemporaryDirectory() as td:
arc_path = os.path.join(td, "packed.zip")
with open(arc_path, "wb") as f:
f.write(model_data)
zipfile.ZipFile(arc_path, 'r', zipfile.ZIP_DEFLATED).extractall(td)
U.load_state(os.path.join(td, "model"))
return ActWrapper(act, act_params)
def relatively_safe_pickle_dump(obj, path, compression=False):
"""This is just like regular pickle dump, except from the fact that failure cases are
different:
- It's never possible that we end up with a pickle in corrupted state.
- If a there was a different file at the path, that file will remain unchanged in the
even of failure (provided that filesystem rename is atomic).
- it is sometimes possible that we end up with useless temp file which needs to be
deleted manually (it will be removed automatically on the next function call)
The indended use case is periodic checkpoints of experiment state, such that we never
corrupt previous checkpoints if the current one fails.
Parameters
----------
obj: object
object to pickle
path: str
path to the output file
compression: bool
if true pickle will be compressed
"""
temp_storage = path + ".relatively_safe"
if compression:
# Using gzip here would be simpler, but the size is limited to 2GB
with tempfile.NamedTemporaryFile() as uncompressed_file:
pickle.dump(obj, uncompressed_file)
with zipfile.ZipFile(temp_storage, "w", compression=zipfile.ZIP_DEFLATED) as myzip:
myzip.write(uncompressed_file.name, "data")
else:
with open(temp_storage, "wb") as f:
pickle.dump(obj, f)
os.rename(temp_storage, path)
def backup_mysql():
try:
back_file_name = database_name+time.strftime("%Y%m%d_%H%M%S", time.localtime())
back_file_path = os.path.join(backup_dir,back_file_name)
backup_command= '%s -h%s -P%d -u%s -p%s --default-character-set=utf8 %s > %s'\
%(mysql_dump,mysql_host,int(mysql_port),mysql_user,mysql_pass,database_name,back_file_path)
if os.system(backup_command) == 0:
zip_file_path = back_file_path+'.zip'
f = zipfile.ZipFile(zip_file_path, 'w' ,zipfile.ZIP_DEFLATED,allowZip64=True)
f.write(back_file_path,back_file_name)
f.close()
md5_num = str(md5sum(zip_file_path))
md5_file_path = zip_file_path+'_'+md5_num
back_file_new_path = os.path.join(backup_dir,md5_file_path)
os.rename(zip_file_path,back_file_new_path)
os.remove(back_file_path)
data = {'status':'back_success','file_path':back_file_new_path,'md5':md5_num}
return data
else:
data = {'status':'back_failed'}
return data
except Exception, e:
print e
data = {'status':'back_failed'}
return data
def refresh_lambda_zip(lambda_files, lambda_dir):
with zipfile.ZipFile(temp_deploy_zip, "w", zipfile.ZIP_DEFLATED) as zf:
for f in lambda_files:
zf.write(lambda_dir + '/' + f, basename(f))
def archive(self, build_dir):
assert self.source_dir
create_archive = True
archive_name = '%s-%s.zip' % (self.name, self.installed_version)
archive_path = os.path.join(build_dir, archive_name)
if os.path.exists(archive_path):
response = ask_path_exists(
'The file %s exists. (i)gnore, (w)ipe, (b)ackup ' %
display_path(archive_path), ('i', 'w', 'b'))
if response == 'i':
create_archive = False
elif response == 'w':
logger.warn('Deleting %s' % display_path(archive_path))
os.remove(archive_path)
elif response == 'b':
dest_file = backup_dir(archive_path)
logger.warn('Backing up %s to %s'
% (display_path(archive_path), display_path(dest_file)))
shutil.move(archive_path, dest_file)
if create_archive:
zip = zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED)
dir = os.path.normcase(os.path.abspath(self.source_dir))
for dirpath, dirnames, filenames in os.walk(dir):
if 'pip-egg-info' in dirnames:
dirnames.remove('pip-egg-info')
for dirname in dirnames:
dirname = os.path.join(dirpath, dirname)
name = self._clean_zip_name(dirname, dir)
zipdir = zipfile.ZipInfo(self.name + '/' + name + '/')
zipdir.external_attr = 0x1ED << 16 # 0o755
zip.writestr(zipdir, '')
for filename in filenames:
if filename == PIP_DELETE_MARKER_FILENAME:
continue
filename = os.path.join(dirpath, filename)
name = self._clean_zip_name(filename, dir)
zip.write(filename, self.name + '/' + name)
zip.close()
logger.indent -= 2
logger.notify('Saved %s' % display_path(archive_path))
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=None,
mode='w'
):
"""Create a zip file from all the files under 'base_dir'. The output
zip file will be named 'base_dir' + ".zip". Uses either the "zipfile"
Python module (if available) or the InfoZIP "zip" utility (if installed
and found on the default search path). If neither tool is available,
raises DistutilsExecError. Returns the name of the output zip file.
"""
import zipfile
mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
def visit(z, dirname, names):
for name in names:
path = os.path.normpath(os.path.join(dirname, name))
if os.path.isfile(path):
p = path[len(base_dir)+1:]
if not dry_run:
z.write(path, p)
log.debug("adding '%s'" % p)
if compress is None:
compress = (sys.version>="2.4") # avoid 2.3 zipimport bug when 64 bits
compression = [zipfile.ZIP_STORED, zipfile.ZIP_DEFLATED][bool(compress)]
if not dry_run:
z = zipfile.ZipFile(zip_filename, mode, compression=compression)
for dirname, dirs, files in os.walk(base_dir):
visit(z, dirname, files)
z.close()
else:
for dirname, dirs, files in os.walk(base_dir):
visit(None, dirname, files)
return zip_filename
#
def build_zip(self, pathname, archive_paths):
with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
for ap, p in archive_paths:
logger.debug('Wrote %s to %s in wheel', p, ap)
zf.write(p, ap)
def GetZipWith(fname, content):
fp = StringIO.StringIO()
zf = zipfile.ZipFile(fp, "a", zipfile.ZIP_DEFLATED, False)
zf.writestr(fname, content)
zf.close()
return fp.getvalue()
def save_files_in_zip():
"""saves created files in zip file"""
if not CREATED_FILES:
output_line('-- nothing to zip')
else:
zip_name = os.path.join(SCHEMA_DIR + '.zip')
output_line('creating zip %s ...' % (zip_name))
zip_f = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED)
for fn in CREATED_FILES:
fne = fn.encode(OUT_FILE_ENCODING)
#print('storing %s...' % (fne))
zip_f.write(fne)
os.remove(fne)
zip_f.close()
clean_up()
def archive_dir(source, arcname, target, arc_time):
# Set the filename and destination.
arcname = str(arc_time)+'_'+arcname+'.zip' # Archive filename
target = str(target+'/'+arcname) # Path to storage...
dirs = [str(source)] # Set initial "root" directories to pop.
zipObj = zipfile.ZipFile(target,'w',zipfile.ZIP_DEFLATED)
try:
while dirs:
# Loop through and get all sub dirs and files.
dir_list=dirs.pop(0) # pop next dir.
try:
for items in os.listdir(dir_list+'/'):
if os.path.isdir(dir_list+'/'+items):
# Collect sub dirs for pop.
dirs+=[dir_list+'/'+items]
elif os.path.isfile(dir_list+'/'+items):
# Ignor the archive file if in the dir structure.
if items.lower() == arcname: continue
if items.lower()[:3] == 'ini': continue #task directory filter.
# Write to the zip.
zipObj.write(str(dir_list+'/'+items),None,None)
except:
pass # Ignor non-accessable directories!
zipObj.close()
return 1 # Success...
except Exception, error:
return error # Backup failed...
#//////////////////////////////////////////////////////>
# CONFIGURATION AND PATH SETUPS
#//////////////////////////////////////////////////////>
# SPECIFY SOURCE PATHS; add as many as you like.
# Don't forget to add these to the SOURCE_PATH list below.
def append(self, pathAndFileName):
""" adding a file or a path (recursive) to the zip archive in memory """
zf = zipfile.ZipFile(self.inMemory, "a", zipfile.ZIP_DEFLATED, False)
if os.path.isfile(pathAndFileName):
zf.write(pathAndFileName)
else:
path = pathAndFileName
for root, folders, files in os.walk(path):
for file in files:
fullName = os.path.join(root, file)
zf.write(fullName)
def create_update_archive(archive_path):
zp = zipfile.ZipFile(archive_path, 'w', compression=zipfile.ZIP_DEFLATED)
files_list = create_app_update_list()
abs_path = env_mode.get_current_path()
for fl in files_list:
fl_rep = fl.replace
zp.write(fl, arcname=fl_rep(abs_path, ''))
zp.close()
def _zip_factory(self, filename):
"""create a ZipFile Object with compression and allow big ZIP files (allowZip64)"""
try:
import zlib
assert zlib
zip_compression= zipfile.ZIP_DEFLATED
except Exception as ex:
zip_compression= zipfile.ZIP_STORED
_zip = zipfile.ZipFile(file=filename, mode='w', compression=zip_compression, allowZip64=True)
return _zip
def build_zip(self, pathname, archive_paths):
with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
for ap, p in archive_paths:
logger.debug('Wrote %s to %s in wheel', p, ap)
zf.write(p, ap)
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
mode='w'):
"""Create a zip file from all the files under 'base_dir'. The output
zip file will be named 'base_dir' + ".zip". Uses either the "zipfile"
Python module (if available) or the InfoZIP "zip" utility (if installed
and found on the default search path). If neither tool is available,
raises DistutilsExecError. Returns the name of the output zip file.
"""
import zipfile
mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
def visit(z, dirname, names):
for name in names:
path = os.path.normpath(os.path.join(dirname, name))
if os.path.isfile(path):
p = path[len(base_dir) + 1:]
if not dry_run:
z.write(path, p)
log.debug("adding '%s'", p)
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
if not dry_run:
z = zipfile.ZipFile(zip_filename, mode, compression=compression)
for dirname, dirs, files in os.walk(base_dir):
visit(z, dirname, files)
z.close()
else:
for dirname, dirs, files in os.walk(base_dir):
visit(None, dirname, files)
return zip_filename
def build_zip(self, pathname, archive_paths):
with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
for ap, p in archive_paths:
logger.debug('Wrote %s to %s in wheel', p, ap)
zf.write(p, ap)
def make_wheelfile_inner(base_name, base_dir='.'):
"""Create a whl file from all the files under 'base_dir'.
Places .dist-info at the end of the archive."""
zip_filename = base_name + ".whl"
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
# XXX support bz2, xz when available
zip = zipfile.ZipFile(open(zip_filename, "wb+"), "w",
compression=zipfile.ZIP_DEFLATED)
score = {'WHEEL': 1, 'METADATA': 2, 'RECORD': 3}
deferred = []
def writefile(path):
zip.write(path, path)
log.info("adding '%s'" % path)
for dirpath, dirnames, filenames in os.walk(base_dir):
for name in filenames:
path = os.path.normpath(os.path.join(dirpath, name))
if os.path.isfile(path):
if dirpath.endswith('.dist-info'):
deferred.append((score.get(name, 0), path))
else:
writefile(path)
deferred.sort()
for score, path in deferred:
writefile(path)
zip.close()
return zip_filename
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
mode='w'):
"""Create a zip file from all the files under 'base_dir'. The output
zip file will be named 'base_dir' + ".zip". Uses either the "zipfile"
Python module (if available) or the InfoZIP "zip" utility (if installed
and found on the default search path). If neither tool is available,
raises DistutilsExecError. Returns the name of the output zip file.
"""
import zipfile
mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
def visit(z, dirname, names):
for name in names:
path = os.path.normpath(os.path.join(dirname, name))
if os.path.isfile(path):
p = path[len(base_dir) + 1:]
if not dry_run:
z.write(path, p)
log.debug("adding '%s'" % p)
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
if not dry_run:
z = zipfile.ZipFile(zip_filename, mode, compression=compression)
for dirname, dirs, files in os.walk(base_dir):
visit(z, dirname, files)
z.close()
else:
for dirname, dirs, files in os.walk(base_dir):
visit(None, dirname, files)
return zip_filename