def make_signature(self, info, required_kwargs):
"""Calculates the required signature to execute a step in the pipeline.
:param dict info: info dict generated by pipeline describing a task
:param dict required_kwargs: Keyword arguments that :func:`@pipeline`
elements might require.
:returns: celery.Signature that will run the task as described.
Will be celery.chord for map/reduce tasks
"""
# Avoid circular import - used for map/reduce tasks
from .tasks import lazy_async_apply_map
new_kwargs = {k: v for k, v in required_kwargs.items() if k in info.get('required_kwarg_names', [])}
missing_kwargs = list(set(info.get('required_kwarg_names', [])) - set(new_kwargs.keys()))
if missing_kwargs:
raise MissingArgument(
'{} requires {} keyword arguments specified'.format(
info['func'],
', '.join(missing_kwargs),
),
)
task = info['func'].s(
**new_kwargs
)
# Check for mapper
mapper_name = info.get('mapper')
reducer_name = info.get('reducer')
# If only one is defined, this is an error
if bool(mapper_name) != bool(reducer_name):
raise DependencyError(
'Both mapper and reducer are required together info="{}"'.format(info))
if mapper_name: # implies reducer_name as well
# This is a map/reduce task
try:
mapper = self.mappers[mapper_name]
except KeyError:
raise DependencyError('Missing mapper "{}"'.format(mapper_name))
try:
reducer = self.reducers[reducer_name]
except KeyError:
raise DependencyError('Missing reducer "{}"'.format(reducer_name))
# lazy_async_apply_map must always be called in a chord for now, see:
# https://github.com/celery/celery/issues/2722
task = (
mapper.s(*args, **new_kwargs) |
chord(lazy_async_apply_map.s(task), reducer.s(*args, **new_kwargs))
)
return task
评论列表
文章目录