def loads(payload):
if payload.get('type') != 'normal':
raise Exception('celery task loader only support normal mode')
tasks = payload.get('tasks', [])
cts = []
for task in tasks:
ops = [load(id, task.get('args'), task.get('on_error')) if i == 0 else load(id, None, task.get('on_error')) for
i, id in enumerate(task['ids'])]
cts.append(chain(ops))
callback = payload.get('callback')
if callback:
return chord(header=group(cts), body=func.load(callback).s())
return group(cts)
python类chord()的实例源码
def test_create_chord_exclude_body(self):
"""If the body task of a chord is not a UserTask, it should be cleanly omitted from the status."""
chord([
sample_task.s(self.user.id, '1', user_task_name='Chord: 1 & 2'),
sample_task.s(self.user.id, '2', user_task_name='I should be ignored')
])(normal_task.s('3'))
assert UserTaskStatus.objects.count() == 4
chord_status = UserTaskStatus.objects.get(task_class='celery.chord')
assert chord_status.task_id
assert chord_status.parent is None
assert chord_status.is_container
assert chord_status.name == 'Chord: 1 & 2'
assert chord_status.total_steps == 2
verify_state(chord_status, False)
group_status = UserTaskStatus.objects.get(task_class='celery.group')
assert group_status.task_id
assert group_status.parent_id == chord_status.id
assert group_status.is_container
assert group_status.name == 'Chord: 1 & 2'
assert group_status.total_steps == 2
verify_state(group_status, False)
header_tasks = UserTaskStatus.objects.filter(parent=group_status)
assert len(header_tasks) == 2
for status in header_tasks:
assert status.task_id
assert status.parent_id == group_status.id
assert not status.is_container
assert status.name in ['SampleTask: 1', 'SampleTask: 2']
assert status.total_steps == 1
verify_state(status, False)
def produce_hot_repo_report(period, ref_date=None):
# 1. parse date
ref_date_str = strf_date(period, ref_date=ref_date)
# 2. fetch and join
fetch_jobs = group([
fetch_hot_repos.s(ref_date_str, 100, 1),
fetch_hot_repos.s(ref_date_str, 100, 2),
fetch_hot_repos.s(ref_date_str, 100, 3),
fetch_hot_repos.s(ref_date_str, 100, 4),
fetch_hot_repos.s(ref_date_str, 100, 5)
])
# 3. group by language and
# 4. create csv
return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get()
def make_signature(self, info, required_kwargs):
"""Calculates the required signature to execute a step in the pipeline.
:param dict info: info dict generated by pipeline describing a task
:param dict required_kwargs: Keyword arguments that :func:`@pipeline`
elements might require.
:returns: celery.Signature that will run the task as described.
Will be celery.chord for map/reduce tasks
"""
# Avoid circular import - used for map/reduce tasks
from .tasks import lazy_async_apply_map
new_kwargs = {k: v for k, v in required_kwargs.items() if k in info.get('required_kwarg_names', [])}
missing_kwargs = list(set(info.get('required_kwarg_names', [])) - set(new_kwargs.keys()))
if missing_kwargs:
raise MissingArgument(
'{} requires {} keyword arguments specified'.format(
info['func'],
', '.join(missing_kwargs),
),
)
task = info['func'].s(
**new_kwargs
)
# Check for mapper
mapper_name = info.get('mapper')
reducer_name = info.get('reducer')
# If only one is defined, this is an error
if bool(mapper_name) != bool(reducer_name):
raise DependencyError(
'Both mapper and reducer are required together info="{}"'.format(info))
if mapper_name: # implies reducer_name as well
# This is a map/reduce task
try:
mapper = self.mappers[mapper_name]
except KeyError:
raise DependencyError('Missing mapper "{}"'.format(mapper_name))
try:
reducer = self.reducers[reducer_name]
except KeyError:
raise DependencyError('Missing reducer "{}"'.format(reducer_name))
# lazy_async_apply_map must always be called in a chord for now, see:
# https://github.com/celery/celery/issues/2722
task = (
mapper.s(*args, **new_kwargs) |
chord(lazy_async_apply_map.s(task), reducer.s(*args, **new_kwargs))
)
return task
def _create_chord(self, eager):
"""Create a celery chord and verify some assertions about the corresponding status records"""
chord([
sample_task.s(self.user.id, '1'),
sample_task.s(self.user.id, '2', user_task_name='Chord: 1 & 2, then 3')
])(sample_task.s(self.user.id, '3'))
assert UserTaskStatus.objects.count() == 5
chord_status = UserTaskStatus.objects.get(task_class='celery.chord')
assert chord_status.task_id
assert chord_status.parent is None
assert chord_status.is_container
assert chord_status.name == 'Chord: 1 & 2, then 3'
assert chord_status.total_steps == 3
verify_state(chord_status, eager)
group_status = UserTaskStatus.objects.get(task_class='celery.group')
assert group_status.task_id
assert group_status.parent_id == chord_status.id
assert group_status.is_container
assert group_status.name == 'Chord: 1 & 2, then 3'
assert group_status.total_steps == 2
verify_state(group_status, eager)
header_tasks = UserTaskStatus.objects.filter(parent=group_status)
assert len(header_tasks) == 2
for status in header_tasks:
assert status.task_id
assert status.parent_id == group_status.id
assert not status.is_container
assert status.name in ['SampleTask: 1', 'SampleTask: 2']
assert status.total_steps == 1
verify_state(status, eager)
body_status = UserTaskStatus.objects.get(parent=chord_status, is_container=False)
assert body_status.task_id
assert body_status.name == 'SampleTask: 3'
assert body_status.total_steps == 1
verify_state(body_status, eager)