__init__.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:capillary 作者: celery-capillary 项目源码 文件源码
def build_tree(self, tasks):
        """Accepts any number of tasks as returned by _get_pipeline.

        :param tasks: dictionary of str:info where str is the name of the task, info is from the registry

        :returns: Graph containing each node connected by dependencies, "after: ALL" nodes will be ignored
        :rtype: networkx.DiGraph

        :raises: :exc:`DependencyError`
        """
        all_task_names = [task for tasks_subset in self.registry.values() for task in tasks_subset]

        # Find dependencies - directed graph of node names
        tree = nx.DiGraph()

        # Add nodes
        for name, info in tasks.items():
            # TODO: doing this twice sucks
            if info['after'] is ALL:
                # ignore these
                continue
            tree.add_node(name, info=info)

        # Add edges
        for name, info in tasks.items():
            if info['after'] is ALL:
                # ignore these
                continue
            for req in info['after']:
                if req not in all_task_names:
                    msg = '"{}" pipeline element was not found, but it is declared as dependency of the pipeline "{}" with arguments "{}"'
                    raise DependencyError(msg.format(req, name, info))
                if req in tree:  # don't add an edge if dependency is not part of the current set of tasks
                    tree.add_edge(req, name)

        # Not as useful as it originally seemed
        # tree = prune_edges(tree)

        # Check for circular dependencies
        try:
            cycle = nx.simple_cycles(tree).next()
            raise DependencyError('Circular dependencies detected: {}'.format(cycle))
        except StopIteration:
            # Good - didn't want any cycles
            pass

        # Joins (merge multiple tasks) have more than one edge in
        # joins = [n for n, d in tree.in_degree_iter() if d > 1]

        # Don't support joins right now, one reducer at the end of the chain
        # if joins:
        #    raise DependencyError('Multiple after values not currently supported joins="{}"'.format(joins))

        # TODO - even with joins this could be a challenge
        # Can't handle "N" shapes, so check for those
        # Descendants of forks, cannot join from outside.
        return tree
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号