def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory, using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % (filename,))
paths = {filename:('',extract_dir)}
for base, dirs, files in os.walk(filename):
src,dst = paths[base]
for d in dirs:
paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d)
for f in files:
name = src+f
target = os.path.join(dst,f)
target = progress_filter(src+f, target)
if not target:
continue # skip non-files
ensure_directory(target)
f = os.path.join(base,f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
python类walk()的实例源码
def ensure_init(path):
'''
ensure directories leading up to path are importable, omitting
parent directory, eg path='/hooks/helpers/foo'/:
hooks/
hooks/helpers/__init__.py
hooks/helpers/foo/__init__.py
'''
for d, dirs, files in os.walk(os.path.join(*path.split('/')[:2])):
_i = os.path.join(d, '__init__.py')
if not os.path.exists(_i):
logging.info('Adding missing __init__.py: %s' % _i)
open(_i, 'wb').close()
def chownr(path, owner, group, follow_links=True, chowntopdir=False):
"""Recursively change user and group ownership of files and directories
in given path. Doesn't chown path itself by default, only its children.
:param str path: The string path to start changing ownership.
:param str owner: The owner string to use when looking up the uid.
:param str group: The group string to use when looking up the gid.
:param bool follow_links: Also Chown links if True
:param bool chowntopdir: Also chown path itself if True
"""
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
if follow_links:
chown = os.chown
else:
chown = os.lchown
if chowntopdir:
broken_symlink = os.path.lexists(path) and not os.path.exists(path)
if not broken_symlink:
chown(path, uid, gid)
for root, dirs, files in os.walk(path):
for name in dirs + files:
full = os.path.join(root, name)
broken_symlink = os.path.lexists(full) and not os.path.exists(full)
if not broken_symlink:
chown(full, uid, gid)
def get_dir_size(start_path = '.'):
total_size = 0
for dirpath, dirnames, filenames in os.walk(start_path):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return total_size
def get_apk_file_in_path(path):
apk_map = []
for parent, dir_names, filenames in os.walk(path):
for f in filenames:
if f.endswith(Res.pgk_suffix):
apk_map.append(os.path.join(parent, f))
return apk_map
def list_images(dir):
r = []
for root, dirs, files in os.walk(dir):
for name in files:
if "jpg" in name or "mp4" in name:
r.append(os.path.join(root, name))
return r
# Load game IDs file
def package_files(directory):
"""Recursively add subfolders and files."""
paths = []
for (path, directories, filenames) in os.walk(directory):
for filename in filenames:
paths.append(os.path.join('..', path, filename))
return paths
def maybe_load_scriptlets(cfg):
if scriptlets is not None:
return
global scriptlets
scriptlets = OrderedDict()
scriptlet_dir = os.path.join(cfg['assets'], 'scriptlets')
for root, dirs, files in os.walk(scriptlet_dir):
for filename in files:
contents = open(os.path.join(root, filename)).read()
m = re.search('_(.*)\.py', filename)
if m:
filename = m.group(1)
scriptlets[filename] = contents
def maybe_load_policies(cfg, policies_dict):
# Update policies_dict with files found in /src/assets/policies
policy_dir = os.path.join(cfg['assets'], 'policies')
for root, dirs, files in os.walk(policy_dir):
for filename in files:
contents = open(os.path.join(root, filename)).read()
policies_dict[filename] = contents
return policies_dict
def __init__(self, cfg):
static_dir = os.path.join(cfg['assets'], 'static')
self.files = {}
for root, dirs, files in os.walk(static_dir):
for filename in files:
if not filename.startswith('!'):
fullpath = os.path.join(root, filename)
self.files[filename] = File(fullpath)
def _get_project(self, name):
result = {'urls': {}, 'digests': {}}
for root, dirs, files in os.walk(self.base_dir):
for fn in files:
if self.should_include(fn, root):
fn = os.path.join(root, fn)
url = urlunparse(('file', '',
pathname2url(os.path.abspath(fn)),
'', '', ''))
info = self.convert_url_to_download_info(url, name)
if info:
self._update_version_data(result, info)
if not self.recursive:
break
return result
def _iglob(path_glob):
rich_path_glob = RICH_GLOB.split(path_glob, 1)
if len(rich_path_glob) > 1:
assert len(rich_path_glob) == 3, rich_path_glob
prefix, set, suffix = rich_path_glob
for item in set.split(','):
for path in _iglob(''.join((prefix, item, suffix))):
yield path
else:
if '**' not in path_glob:
for item in std_iglob(path_glob):
yield item
else:
prefix, radical = path_glob.split('**', 1)
if prefix == '':
prefix = '.'
if radical == '':
radical = '*'
else:
# we support both
radical = radical.lstrip('/')
radical = radical.lstrip('\\')
for path, dir, files in os.walk(prefix):
path = os.path.normpath(path)
for fn in _iglob(os.path.join(path, radical)):
yield fn
def write_record(self, bdist_dir, distinfo_dir):
from wheel.util import urlsafe_b64encode
record_path = os.path.join(distinfo_dir, 'RECORD')
record_relpath = os.path.relpath(record_path, bdist_dir)
def walk():
for dir, dirs, files in os.walk(bdist_dir):
dirs.sort()
for f in sorted(files):
yield os.path.join(dir, f)
def skip(path):
"""Wheel hashes every possible file."""
return (path == record_relpath)
with open_for_csv(record_path, 'w+') as record_file:
writer = csv.writer(record_file)
for path in walk():
relpath = os.path.relpath(path, bdist_dir)
if skip(relpath):
hash = ''
size = ''
else:
with open(path, 'rb') as f:
data = f.read()
digest = hashlib.sha256(data).digest()
hash = 'sha256=' + native(urlsafe_b64encode(digest))
size = len(data)
record_path = os.path.relpath(
path, bdist_dir).replace(os.path.sep, '/')
writer.writerow((record_path, hash, size))
def test_no_scripts():
"""Make sure entry point scripts are not generated."""
dist = "complex-dist"
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
assert not '.data/scripts/' in entry.filename
def get_ext_outputs(self):
"""Get a list of relative paths to C extensions in the output distro"""
all_outputs = []
ext_outputs = []
paths = {self.bdist_dir: ''}
for base, dirs, files in os.walk(self.bdist_dir):
for filename in files:
if os.path.splitext(filename)[1].lower() in NATIVE_EXTENSIONS:
all_outputs.append(paths[base] + filename)
for filename in dirs:
paths[os.path.join(base, filename)] = (paths[base] +
filename + '/')
if self.distribution.has_ext_modules():
build_cmd = self.get_finalized_command('build_ext')
for ext in build_cmd.extensions:
if isinstance(ext, Library):
continue
fullname = build_cmd.get_ext_fullname(ext.name)
filename = build_cmd.get_ext_filename(fullname)
if not os.path.basename(filename).startswith('dl-'):
if os.path.exists(os.path.join(self.bdist_dir, filename)):
ext_outputs.append(filename)
return all_outputs, ext_outputs
def walk_egg(egg_dir):
"""Walk an unpacked egg's contents, skipping the metadata directory"""
walker = os.walk(egg_dir)
base, dirs, files = next(walker)
if 'EGG-INFO' in dirs:
dirs.remove('EGG-INFO')
yield base, dirs, files
for bdf in walker:
yield bdf
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
mode='w'):
"""Create a zip file from all the files under 'base_dir'. The output
zip file will be named 'base_dir' + ".zip". Uses either the "zipfile"
Python module (if available) or the InfoZIP "zip" utility (if installed
and found on the default search path). If neither tool is available,
raises DistutilsExecError. Returns the name of the output zip file.
"""
import zipfile
mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
def visit(z, dirname, names):
for name in names:
path = os.path.normpath(os.path.join(dirname, name))
if os.path.isfile(path):
p = path[len(base_dir) + 1:]
if not dry_run:
z.write(path, p)
log.debug("adding '%s'", p)
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
if not dry_run:
z = zipfile.ZipFile(zip_filename, mode, compression=compression)
for dirname, dirs, files in os.walk(base_dir):
visit(z, dirname, files)
z.close()
else:
for dirname, dirs, files in os.walk(base_dir):
visit(None, dirname, files)
return zip_filename
def add_output(self, path):
if os.path.isdir(path):
for base, dirs, files in os.walk(path):
for filename in files:
self.outputs.append(os.path.join(base, filename))
else:
self.outputs.append(path)
def create_zipfile(self, filename):
zip_file = zipfile.ZipFile(filename, "w")
try:
self.mkpath(self.target_dir) # just in case
for root, dirs, files in os.walk(self.target_dir):
if root == self.target_dir and not files:
tmpl = "no files found in upload directory '%s'"
raise DistutilsOptionError(tmpl % self.target_dir)
for name in files:
full = os.path.join(root, name)
relative = root[len(self.target_dir):].lstrip(os.path.sep)
dest = os.path.join(relative, name)
zip_file.write(full, dest)
finally:
zip_file.close()
def _find_packages_iter(cls, where, exclude, include):
"""
All the packages found in 'where' that pass the 'include' filter, but
not the 'exclude' filter.
"""
for root, dirs, files in os.walk(where, followlinks=True):
# Copy dirs to iterate over it, then empty dirs.
all_dirs = dirs[:]
dirs[:] = []
for dir in all_dirs:
full_path = os.path.join(root, dir)
rel_path = os.path.relpath(full_path, where)
package = rel_path.replace(os.path.sep, '.')
# Skip directory trees that are not valid packages
if ('.' in dir or not cls._looks_like_package(full_path)):
continue
# Should this package be included?
if include(package) and not exclude(package):
yield package
# Keep searching subdirectories, as there may be more packages
# down there, even if the parent was excluded.
dirs.append(dir)