def create(self, flow, *tasks, deps=None):
if not deps:
deps = {}
Flow(flow, tasks, deps)
dep_lines = list(map(lambda x: x[0] + '->' + ','.join(x[1]), deps.items()))
create_flow = {
'tasks': list(tasks),
'deps': dep_lines
}
class IndentDumper(yaml.Dumper):
def increase_indent(self, flow=False, indentless=False):
return super(IndentDumper, self).increase_indent(flow, False)
os.makedirs(self.flow_dir, exist_ok=True)
flow_file = os.path.join(self.flow_dir, flow + '.yml')
with open(flow_file, 'w') as f:
yaml.dump(create_flow, f, Dumper=IndentDumper, default_flow_style=False)
评论列表
文章目录