def generate(self):
"""
Main method of this class it walks in a directory and gets txt files to process
:return: string name of created arff file.
"""
folders = [f.path for f in os.scandir(self.folderpath) if f.is_dir()]
for folder in folders:
# get txt files from folder path
files = [f.path for f in os.scandir(folder) if f.name.endswith(".txt")]
self.empty_counter()
for f_name in files:
self.totalCounter = self.count_words(f_name)
self.counterList[f_name] = self.totalCounter
arff_file = self.create_arff() # generate file
return arff_file
python类scandir()的实例源码
def __init__(self, dirName):
"""
Args:
dirName (string): directory where to load the corpus
"""
self.MAX_NUMBER_SUBDIR = 10
self.conversations = []
__dir = os.path.join(dirName, "dialogs")
number_subdir = 0
for sub in tqdm(os.scandir(__dir), desc="Ubuntu dialogs subfolders", total=len(os.listdir(__dir))):
if number_subdir == self.MAX_NUMBER_SUBDIR:
print("WARNING: Early stoping, only extracting {} directories".format(self.MAX_NUMBER_SUBDIR))
return
if sub.is_dir():
number_subdir += 1
for f in os.scandir(sub.path):
if f.name.endswith(".tsv"):
self.conversations.append({"lines": self.loadLines(f.path)})
def _do_tree(root_src, root_dest, tmpl_dict, tmpl_ext, tag_delim, level=0):
if level == 0:
_mkdir(root_dest)
for entry in os.scandir(root_src):
src_path = os.path.join(root_src, entry.name)
dest_path = os.path.join(root_dest,
do_text(entry.name, tmpl_dict, tag_delim))
if entry.is_dir():
_mkdir(dest_path, copy_stats_from=src_path)
_do_tree(src_path, dest_path, tmpl_dict, tmpl_ext, tag_delim,
level + 1)
elif entry.is_file():
was_tmpl = False
for ext in tmpl_ext:
ext = ext.lower()
if entry.name.lower().endswith(ext):
was_tmpl = True
dest_path = dest_path[0:-len(ext)]
do_file(src_path, dest_path, tmpl_dict, tag_delim)
break
if not was_tmpl:
shutil.copy2(src_path, dest_path, follow_symlinks=False)
def extract_features(feature_extraction, save_dir, data_dir=DATA_DIR, extension=".cell", model_name=""):
"""
For all of the files in the in the `data_dir` with the `extension` extension, it extracts the features using the `feature_extraction` function.
@param feature_extraction is a function that takes a trace as an input and returns a list of features (1D)
@param save_dir is the directory where you save the features for the traces.
Every in this dir is called `{website}-{id}.cellf` with both `website` and `id` being integers
@param data_dir is the absolute path to the data directory
@param extension is the extension of the files that contain the raw traces
@param model_name is used for printing for what model we are extracting features
"""
paths = []
for i, f in enumerate(scandir(data_dir)):
if f.is_file() and f.name[-len(extension):] == extension:
paths.append(f.path)
extract_features_from_files(feature_extraction, paths, save_dir, extension=extension, model_name=model_name)
def __load_mergers(self):
mergers = {}
for f in os.scandir(os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"mergers")):
if f.is_file() and f.name.endswith(".py"):
name = f.name[:-3]
mod = __import__("mergers.%s" % name, fromlist=["Merger"])
try:
m = getattr(mod, "Merger")
if issubclass(m, BaseMerger):
merger = m()
logging.debug("Found merger for %s", \
merger.get_supported_software().keys())
for software in merger.get_supported_software():
if software in mergers:
mergers[software].append(merger)
else:
mergers[software] = [ merger ]
except AttributeError:
logging.warning("Merger %s found but doesn't implement a Merger class inheriting from BaseMerger", name)
return mergers
def read_files_worker(self, directory, queue):
""" Read all files in a directory and output to the queue. First line
of every file should contain the index. Worker separates first line
and parses to dict. Tuple of index and text is added to queue.
:directory: Source directory containing files
:queue: Queue to add the tuples to
"""
for file in os.scandir(directory):
if file.is_file():
with open(file.path, 'r', errors='replace') as f:
text = f.readlines()
try:
index = literal_eval(text.pop(0).strip())
queue.put((index, '\n'.join(text)), block=True)
except IndexError:
LOGGER.error('File {0} is not classifyable'
.format(file.path))
LOGGER.info('File reading worker done.')
def rcollect(path, depth, filter=None):
filter = filter or (lambda n: not n.startswith('.'))
path = os.path.expanduser(path)
if os.path.exists(path):
for f in os.scandir(path):
if filter(f.name):
t = 'undefined'
try:
t = 'file' if f.is_file() else 'dir' if f.is_dir() else 'undefined'
except OSError:
pass
if t == 'file':
yield f
elif t == 'dir' and depth > 0:
for e in rcollect(f.path, depth - 1, filter):
yield e
def get_new_venv_name(count=1):
if not os.path.exists(get_venv_dir()): # no cov
if count == 1:
return get_random_venv_name()
else:
return sorted(get_random_venv_name() for _ in range(count))
current_venvs = set(p.name for p in os.scandir(get_venv_dir()))
new_venvs = set()
while len(new_venvs) < count:
name = get_random_venv_name()
while name in current_venvs or name in new_venvs: # no cov
name = get_random_venv_name()
new_venvs.add(name)
return new_venvs.pop() if count == 1 else sorted(new_venvs)
def organize_junk():
for entry in os.scandir():
if entry.is_dir():
continue
file_path = Path(entry.name)
file_format = file_path.suffix.lower()
if file_format in FILE_FORMATS:
directory_path = Path(FILE_FORMATS[file_format])
directory_path.mkdir(exist_ok=True)
file_path.rename(directory_path.joinpath(file_path))
try:
os.mkdir("OTHER-FILES")
except:
pass
for dir in os.scandir():
try:
if dir.is_dir():
os.rmdir(dir)
else:
os.rename(os.getcwd() + '/' + str(Path(dir)), os.getcwd() + '/OTHER-FILES/' + str(Path(dir)))
except:
pass
def _iter_items(self, path: str) -> AsyncIterable:
with os.scandir(path) as directory:
for item in directory:
current_level = self._get_depth(item.path)
# happy scenario, we are in the exact level of requested
# so return whatever it is, a folder, a file or a symlink.
if current_level == self._level:
yield item
continue
# we did't reach the requested sub level yet
# so send recursive _scan inside if this is a folder.
elif current_level < self._level and item.is_dir():
async for e in self._iter_items(item.path):
yield e
# and ignore any other scenario including ignoring files and
# symlinks, if the level is not reached yet.
continue
def __init__(self, dirName):
"""
Args:
dirName (string): directory where to load the corpus
"""
self.MAX_NUMBER_SUBDIR = 10
self.conversations = []
__dir = os.path.join(dirName, "dialogs")
number_subdir = 0
for sub in tqdm(os.scandir(__dir), desc="Ubuntu dialogs subfolders", total=len(os.listdir(__dir))):
if number_subdir == self.MAX_NUMBER_SUBDIR:
print("WARNING: Early stoping, only extracting {} directories".format(self.MAX_NUMBER_SUBDIR))
return
if sub.is_dir():
number_subdir += 1
for f in os.scandir(sub.path):
if f.name.endswith(".tsv"):
self.conversations.append({"lines": self.loadLines(f.path)})
def data_reader(input_dir, shuffle=True):
"""Read images from input_dir then shuffle them
Args:
input_dir: string, path of input dir, e.g., /path/to/dir
Returns:
file_paths: list of strings
"""
file_paths = []
for img_file in scandir(input_dir):
if img_file.name.endswith('.jpg') and img_file.is_file():
file_paths.append(img_file.path)
if shuffle:
# Shuffle the ordering of all image files in order to guarantee
# random ordering of the images with respect to label in the
# saved TFRecord files. Make the randomization repeatable.
shuffled_index = list(range(len(file_paths)))
random.seed(12345)
random.shuffle(shuffled_index)
file_paths = [file_paths[i] for i in shuffled_index]
return file_paths
def traverse(path, ignore_files=None):
if not os.path.exists(path):
return
if ignore_files is None:
ignore_files = []
for item in scandir(path):
if any(fnmatch.fnmatch(item.name, pattern) for pattern in ignore_files):
logger.debug('Ignoring %s', item)
continue
if item.is_dir():
for result in traverse(item.path, ignore_files):
yield os.path.join(item.name, result)
else:
yield item.name
def get_sources_from_files(path):
def order(fname):
if 'bf' in fname: return 0
if 'lamin' in fname: return 1
if 'fibrillarin' in fname: return 2
if 'tom' in fname: return 3
if 'all' in fname: return 10
return 0
files = [i.path for i in os.scandir(path) if i.is_file()]
paths_sources = [i for i in files if (('rgb.tif' in i) or ('bf.tif' in i))]
paths_sources.sort(key=order)
sources = []
for path in paths_sources:
source = tifffile.imread(path)
sources.append(source)
return sources
def save_backup(path_file, n_backups=5):
if not os.path.exists(path_file):
return
path_dir, path_base = os.path.split(path_file)
path_backup_dir = os.path.join(path_dir, 'backups')
if not os.path.exists(path_backup_dir):
os.makedirs(path_backup_dir)
paths_existing_backups = [i.path for i in os.scandir(path_backup_dir)
if (path_base in i.path and i.path.split('.')[-1].isdigit())]
paths_existing_backups.sort(key=lambda x: os.path.getmtime(x))
tag = 0
if len(paths_existing_backups) > 0:
tag = (int(paths_existing_backups[-1].split('.')[-1]) + 1) % 100
paths_delete = paths_existing_backups[:-(n_backups - 1)] if n_backups > 1 else paths_existing_backups
for path in paths_delete:
os.remove(path)
path_backup = os.path.join(path_backup_dir, path_base + '.{:02}'.format(tag))
shutil.copyfile(path_file, path_backup)
print('wrote to:', path_backup)
def save_backup(path_file, n_backups=5):
if not os.path.exists(path_file):
return
path_dir, path_base = os.path.split(path_file)
path_backup_dir = os.path.join(path_dir, 'backups')
if not os.path.exists(path_backup_dir):
os.makedirs(path_backup_dir)
paths_existing_backups = [i.path for i in os.scandir(path_backup_dir)
if (path_base in i.path and i.path.split('.')[-1].isdigit())]
paths_existing_backups.sort(key=lambda x: os.path.getmtime(x))
tag = 0
if len(paths_existing_backups) > 0:
tag = (int(paths_existing_backups[-1].split('.')[-1]) + 1) % 100
paths_delete = paths_existing_backups[:-(n_backups - 1)] if n_backups > 1 else paths_existing_backups
for path in paths_delete:
os.remove(path)
path_backup = os.path.join(path_backup_dir, path_base + '.{:02}'.format(tag))
shutil.copyfile(path_file, path_backup)
print('wrote to:', path_backup)
make_prediction_layouts.py 文件源码
项目:pytorch_fnet
作者: AllenCellModeling
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def find_source_dirs(path_root_dir):
"""Find source directories to make layouts, going at most 1 layer deep.
Returns : list of source directories
"""
def is_source_dir(path):
if not os.path.isdir(path):
return False
has_signal, has_target, has_prediction = False, False, False
for entry in [i.path for i in os.scandir(path) if i.is_file()]:
if any(tag in entry for tag in TAGS_SIGNAL):
has_signal = True
if any(tag in entry for tag in TAGS_TARGET):
has_target = True
if any(tag in entry for tag in TAGS_PREDICTION):
has_prediction = True
return has_signal and has_target and has_prediction
if is_source_dir(path_root_dir):
return [path_root_dir]
results = []
for entry in os.scandir(path_root_dir):
if is_source_dir(entry.path):
results.append(entry.path)
return results
def scan_dir(self, path):
for entry in os.scandir(path):
if entry.name.startswith('.'):
continue
if entry.is_dir():
self.scan_dir(entry.path)
continue
name, ext = os.path.splitext(entry.name)
if ext.lower() in IMAGE_EXTS:
self.add_choice(path)
break
if ext.lower() in ARCHIVE_EXTS:
self.add_choice(entry.path)
def __init__(self, dirName):
"""
Args:
dirName (string): directory where to load the corpus
"""
self.MAX_NUMBER_SUBDIR = 10
self.conversations = []
__dir = os.path.join(dirName, "dialogs")
number_subdir = 0
for sub in tqdm(os.scandir(__dir), desc="Ubuntu dialogs subfolders", total=len(os.listdir(__dir))):
if number_subdir == self.MAX_NUMBER_SUBDIR:
print("WARNING: Early stoping, only extracting {} directories".format(self.MAX_NUMBER_SUBDIR))
return
if sub.is_dir():
number_subdir += 1
for f in os.scandir(sub.path):
if f.name.endswith(".tsv"):
self.conversations.append({"lines": self.loadLines(f.path)})
def make_custom_check_bins_package(source_dir, package_filename):
with gen.util.pkgpanda_package_tmpdir() as tmpdir:
tmp_source_dir = os.path.join(tmpdir, 'check_bins')
shutil.copytree(source_dir, tmp_source_dir)
# Apply permissions
for entry in os.scandir(tmp_source_dir):
# source_dir should have no subdirs.
assert entry.is_file()
os.chmod(entry.path, 0o755)
# Add an empty pkginfo.json.
pkginfo_filename = os.path.join(tmp_source_dir, 'pkginfo.json')
assert not os.path.isfile(pkginfo_filename)
with open(pkginfo_filename, 'w') as f:
f.write('{}')
os.chmod(pkginfo_filename, 0o644)
gen.util.make_pkgpanda_package(tmp_source_dir, package_filename)
def scantree(path_name, skip_list=None):
"""This function returns the files present in path_name, including the
files present in subfolders.
Implementation uses scandir, if available, as it is faster than
os.walk"""
if skip_list is None:
skip_list = DEFAULT_SKIP_LIST
try:
for entry in (e for e in scandir(path_name)
if not is_ignored(e.path, skip_list)):
if entry.is_dir(follow_symlinks=False):
yield from scantree(entry.path, skip_list)
else:
yield entry.path
except PermissionError:
yield 'PermissionError reading {}'.format(path_name)
def process_poetry(self, data_dir='/media/pony/DLdigest/data/languageModel/chinese-poetry/json'):
save_dir = os.path.join(self.save_dir, 'poem')
check_path_exists(save_dir)
count = 0
for entry in os.scandir(data_dir):
if entry.name.startswith('poet'):
with open(entry.path, 'r') as json_file:
poems = json.load(json_file)
for p in poems:
paras = HanziConv.toSimplified(''.join(p['paragraphs']).replace('\n', ''))
paras = filter_punctuation(paras)
for para in paras.split(' '):
if len(para.strip())>1:
pys = ' '.join(np.array(pinyin(para)).flatten())
with open(os.path.join(save_dir, str(count//400000+1)+'.txt'), 'a') as f:
f.write(para+','+pys+'\n')
count += 1
def __init__(self, path, memmap_frames=False, verbose=False):
self.path = _ospath.abspath(path)
self.dir = _ospath.dirname(self.path)
base, ext = _ospath.splitext(_ospath.splitext(self.path)[0]) # split two extensions as in .ome.tif
base = _re.escape(base)
pattern = _re.compile(base + '_(\d*).ome.tif') # This matches the basename + an appendix of the file number
entries = [_.path for _ in _os.scandir(self.dir) if _.is_file()]
matches = [_re.match(pattern, _) for _ in entries]
matches = [_ for _ in matches if _ is not None]
paths_indices = [(int(_.group(1)), _.group(0)) for _ in matches]
self.paths = [self.path] + [path for index, path in sorted(paths_indices)]
self.maps = [TiffMap(path, verbose=verbose) for path in self.paths]
self.n_maps = len(self.maps)
self.n_frames_per_map = [_.n_frames for _ in self.maps]
self.n_frames = sum(self.n_frames_per_map)
self.cum_n_frames = _np.insert(_np.cumsum(self.n_frames_per_map), 0, 0)
self.dtype = self.maps[0].dtype
self.height = self.maps[0].height
self.width = self.maps[0].width
self.shape = (self.n_frames, self.height, self.width)
def hashsite(sitepath):
hash = hashlib.sha256()
def hashdir(dirpath, is_home):
for entry in os.scandir(dirpath):
if entry.is_file():
if entry.name.endswith('~'):
continue
mtime = os.path.getmtime(entry.path)
hash.update(str(mtime).encode())
hash.update(entry.name.encode())
if entry.is_dir():
if is_home and entry.name == 'out':
continue
hashdir(entry.path, False)
hashdir(sitepath, True)
return hash.digest()
def profiles_last_modified(self, slicer):
"""
Retrieves the last modification date of ``slicer``'s profiles.
Args:
slicer (str): the slicer for which to retrieve the last modification date
Returns:
(float) the time stamp of the last modification of the slicer's profiles
"""
if not slicer in self.registered_slicers:
raise UnknownSlicer(slicer)
slicer_profile_path = self.get_slicer_profile_path(slicer)
lms = [os.stat(slicer_profile_path).st_mtime]
lms += [os.stat(entry.path).st_mtime for entry in scandir(slicer_profile_path) if entry.name.endswith(".profile")]
return max(lms)
def _analysis_backlog_generator(self, path=None):
if path is None:
path = self.basefolder
metadata = self._get_metadata(path)
if not metadata:
metadata = dict()
for entry in scandir(path):
if is_hidden_path(entry.name) or not octoprint.filemanager.valid_file_type(entry.name):
continue
if entry.is_file():
if not entry.name in metadata or not isinstance(metadata[entry.name], dict) or not "analysis" in metadata[entry.name]:
printer_profile_rels = self.get_link(entry.path, "printerprofile")
if printer_profile_rels:
printer_profile_id = printer_profile_rels[0]["id"]
else:
printer_profile_id = None
yield entry.name, entry.path, printer_profile_id
elif os.path.isdir(entry.path):
for sub_entry in self._analysis_backlog_generator(entry.path):
yield self.join_path(entry.name, sub_entry[0]), sub_entry[1], sub_entry[2]
def remove_folder(self, path, recursive=True):
path, name = self.sanitize(path)
folder_path = os.path.join(path, name)
if not os.path.exists(folder_path):
return
empty = True
for entry in scandir(folder_path):
if entry.name == ".metadata.yaml":
continue
empty = False
break
if not empty and not recursive:
raise StorageError("{name} in {path} is not empty".format(**locals()), code=StorageError.NOT_EMPTY)
import shutil
shutil.rmtree(folder_path)
self._delete_metadata(folder_path)
def my_dir_walker_with_size_counting(topdir=None):
if topdir is None:
topdir = os.getcwd()
sizes = {topdir: 0} # ?cie?ka: rozmiar w bajtach
stack = []
def inner_walker(new_topdir):
stack.append(new_topdir)
new_topdir_size = os.path.join(*stack)
entries = os.scandir(new_topdir)
size = 0
for entry in entries:
if entry.is_dir(follow_symlinks=False):
entry_size = inner_walker(entry.name)
sizes[os.path.join(*stack)] = entry_size
size += entry_size
stack.pop()
else:
fpath = os.path.join(*stack, entry.name)
sizes[fpath] = os.path.getsize(fpath)
size += os.path.getsize(fpath)
return size
inner_walker(topdir)
return sizes
file_walker_cwiczenia.py 文件源码
项目:zajecia_python_mini_edycja4
作者: daftcode
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def my_directory_walker_with_size_counting(topdir=None):
if topdir is None:
topdir = os.getcwd()
sizes = {topdir: 0}
root_stack = []
current_root_size = 0
def inner_walker(new_topdir):
root_stack.append(new_topdir)
new_topdir_path = os.path.join(*root_stack)
# TODO: PRACA DOMOWA: doda? obs?ug? b??dów
entries = os.scandir(new_topdir_path)
size = 0
for entry in entries:
if entry.is_dir(follow_symlinks=False):
entry_size = inner_walker(entry.name)
sizes[os.path.join(*root_stack)] = entry_size
size += entry_size
root_stack.pop()
elif entry.is_file(follow_symlinks=False):
sizes[os.path.join(*root_stack, entry.name)] = entry.stat().st_size
size += entry.stat().st_size # os.path.getsize
return size
inner_walker(topdir)
return sizes
def clean_unused():
"""
A function to clear unreferenced media files.
"""
if not hasattr(cache, 'delete_pattern'):
# Abort if cache backend is not redis
warnings.warn(
'Unused files clearing aborted due to bad cache backend settings.')
return
_resolve_referenced_files(_fields_to_search())
with os.scandir(settings.MEDIA_ROOT) as iterator:
for entry in iterator:
name = entry.name
if not entry.is_file() or\
cache.get(_make_key(name)) is not None:
continue
default_storage.delete(name)