def get_end_task(self, tasks, required_kwargs):
"""Accepts any number of tasks as returned by _get_pipeline.
:param tasks: dictionary of str:info where str is the name of the task, info is from the registry
:param dict required_kwargs: Keyword arguments that some tasks require
:returns: celery.Signature, or celery.group, or None
"""
sigs = [
self.make_signature(info, required_kwargs)
for name, info in tasks.items()
if info['after'] is ALL
]
if not sigs:
return None
return sigs[0] if len(sigs) == 1 else group(sigs)
评论列表
文章目录