def lazy_async_apply_map(self, items, d, runner):
"""Make new instances of runner for each item in items, and inject that
group into the chord that executed this task.
NB This task does not work with eager results
NB This task does not work with celery==3.1, only on master
:param items: itterable of arguments for the runner
:param d: data to operate on (probably returned by a previous task)
:param runner: task signature to execute on each item. def runner(item, data, *a, **kw)
"""
subtasks = []
for item in items:
r = runner.clone()
r.args = (item, d) + r.args
subtasks.append(r)
g = group(*subtasks)
if self.request.is_eager:
# Maybe this works - sometimes, if the argument count is right
return g.apply().get()
try:
# Celery master (>= 3.2)
raise self.replace(g)
except AttributeError:
pass
# Try to do it ourselves for celery == 3.1
# FIXME - not quite working
# TODO - a bit hacky, reducer should be parameterized
g = group(*subtasks) | generator.s().set(
# task_id=self.request.id,
chord=self.request.chord,
)
# | dict_reducer.s().set(
# task_id=self.request.id,
# chord=self.request.chord,
# reply_to=self.request.reply_to,
# )
# Replace running task with the group
# inspired by task.replace from Celery master (3.2)
g.freeze(
self.request.id,
group_id=self.request.group,
# chord=self.request.chord,
# reply_to=self.request.reply_to,
)
g.delay()
raise Ignore('Chord member replaced by new task')
评论列表
文章目录