def walk(self, filtr: Callable[[str], bool], processor: Callable[[Dict], bool]) -> None:
"""
Walk the directory testing the file against filtr and invoking processor with contents if true
:param filtr: file name tester
:param processor: content row processor
"""
for filedir, _, files in os.walk(self._indir):
for file in files:
if filtr(file):
print("Processing %s" % os.path.join(filedir, file))
with open(os.path.join(filedir, file)) as f:
reader = csv.DictReader(f, delimiter="\t", quoting=csv.QUOTE_NONE)
with self._create_writer(filedir, file, reader) as writer:
for row in reader:
if processor(row):
writer.writerow(row)
评论列表
文章目录