__init__.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:capillary 作者: celery-capillary 项目源码 文件源码
def make_signature(self, info, required_kwargs):
        """Calculates the required signature to execute a step in the pipeline.

        :param dict info: info dict generated by pipeline describing a task
        :param dict required_kwargs: Keyword arguments that :func:`@pipeline`
                                     elements might require.

        :returns: celery.Signature that will run the task as described.
                  Will be celery.chord for map/reduce tasks
        """
        # Avoid circular import - used for map/reduce tasks
        from .tasks import lazy_async_apply_map

        new_kwargs = {k: v for k, v in required_kwargs.items() if k in info.get('required_kwarg_names', [])}

        missing_kwargs = list(set(info.get('required_kwarg_names', [])) - set(new_kwargs.keys()))
        if missing_kwargs:
            raise MissingArgument(
                '{} requires {} keyword arguments specified'.format(
                    info['func'],
                    ', '.join(missing_kwargs),
                ),
            )

        task = info['func'].s(
            **new_kwargs
        )

        # Check for mapper
        mapper_name = info.get('mapper')
        reducer_name = info.get('reducer')
        # If only one is defined, this is an error
        if bool(mapper_name) != bool(reducer_name):
            raise DependencyError(
                'Both mapper and reducer are required together info="{}"'.format(info))

        if mapper_name:  # implies reducer_name as well
            # This is a map/reduce task
            try:
                mapper = self.mappers[mapper_name]
            except KeyError:
                raise DependencyError('Missing mapper "{}"'.format(mapper_name))

            try:
                reducer = self.reducers[reducer_name]
            except KeyError:
                raise DependencyError('Missing reducer "{}"'.format(reducer_name))

            # lazy_async_apply_map must always be called in a chord for now, see:
            # https://github.com/celery/celery/issues/2722
            task = (
                mapper.s(*args, **new_kwargs) |
                chord(lazy_async_apply_map.s(task), reducer.s(*args, **new_kwargs))
            )
        return task
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号