def _get_pipeline(self, **options):
tagged_as = options.pop('tagged_as', [])
required_kwargs = options.pop('required_kwargs', {})
# get tasks for default tag
# Explicit dict for copy? amcm
tasks = dict(self.registry[_sentinel])
# override tasks by adding tasks in correct order
for tag in tagged_as:
if tag not in self.registry:
raise ValueError('No pipelines for a tag {}'.format(tag))
tasks.update(self.registry[tag])
# now that we have the tasks, figure out the order of tasks
tree = self.build_tree(tasks)
# Make the signatures, so we can call the tasks
self.add_signatures_to_graph(
tree,
required_kwargs,
)
# Reduce the tree by dependencies to task chain(s)
celery_tasks = self.get_task_to_run(tree)
# Chain to the final task if needed
final = self.get_end_task(tasks, required_kwargs)
if final is not None:
celery_tasks |= final
return celery_tasks
评论列表
文章目录