def _create_chord(self, eager):
"""Create a celery chord and verify some assertions about the corresponding status records"""
chord([
sample_task.s(self.user.id, '1'),
sample_task.s(self.user.id, '2', user_task_name='Chord: 1 & 2, then 3')
])(sample_task.s(self.user.id, '3'))
assert UserTaskStatus.objects.count() == 5
chord_status = UserTaskStatus.objects.get(task_class='celery.chord')
assert chord_status.task_id
assert chord_status.parent is None
assert chord_status.is_container
assert chord_status.name == 'Chord: 1 & 2, then 3'
assert chord_status.total_steps == 3
verify_state(chord_status, eager)
group_status = UserTaskStatus.objects.get(task_class='celery.group')
assert group_status.task_id
assert group_status.parent_id == chord_status.id
assert group_status.is_container
assert group_status.name == 'Chord: 1 & 2, then 3'
assert group_status.total_steps == 2
verify_state(group_status, eager)
header_tasks = UserTaskStatus.objects.filter(parent=group_status)
assert len(header_tasks) == 2
for status in header_tasks:
assert status.task_id
assert status.parent_id == group_status.id
assert not status.is_container
assert status.name in ['SampleTask: 1', 'SampleTask: 2']
assert status.total_steps == 1
verify_state(status, eager)
body_status = UserTaskStatus.objects.get(parent=chord_status, is_container=False)
assert body_status.task_id
assert body_status.name == 'SampleTask: 3'
assert body_status.total_steps == 1
verify_state(body_status, eager)
评论列表
文章目录