tasks.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:capillary 作者: celery-capillary 项目源码 文件源码
def lazy_async_apply_map(self, items, d, runner):
    """Make new instances of runner for each item in items, and inject that
    group into the chord that executed this task.

    NB This task does not work with eager results
    NB This task does not work with celery==3.1, only on master

    :param items: itterable of arguments for the runner
    :param d: data to operate on (probably returned by a previous task)
    :param runner: task signature to execute on each item. def runner(item, data, *a, **kw)
    """

    subtasks = []
    for item in items:
        r = runner.clone()
        r.args = (item, d) + r.args
        subtasks.append(r)
    g = group(*subtasks)

    if self.request.is_eager:
        # Maybe this works - sometimes, if the argument count is right
        return g.apply().get()

    try:
        # Celery master (>= 3.2)
        raise self.replace(g)
    except AttributeError:
        pass

    # Try to do it ourselves for celery == 3.1
    # FIXME - not quite working

    # TODO - a bit hacky, reducer should be parameterized
    g = group(*subtasks) | generator.s().set(
        # task_id=self.request.id,
        chord=self.request.chord,
    )
    # | dict_reducer.s().set(
    #     task_id=self.request.id,
    #     chord=self.request.chord,
    #     reply_to=self.request.reply_to,
    # )

    # Replace running task with the group
    # inspired by task.replace from Celery master (3.2)
    g.freeze(
        self.request.id,
        group_id=self.request.group,
        # chord=self.request.chord,
        # reply_to=self.request.reply_to,
    )
    g.delay()
    raise Ignore('Chord member replaced by new task')
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号