def build_tree(self, tasks):
"""Accepts any number of tasks as returned by _get_pipeline.
:param tasks: dictionary of str:info where str is the name of the task, info is from the registry
:returns: Graph containing each node connected by dependencies, "after: ALL" nodes will be ignored
:rtype: networkx.DiGraph
:raises: :exc:`DependencyError`
"""
all_task_names = [task for tasks_subset in self.registry.values() for task in tasks_subset]
# Find dependencies - directed graph of node names
tree = nx.DiGraph()
# Add nodes
for name, info in tasks.items():
# TODO: doing this twice sucks
if info['after'] is ALL:
# ignore these
continue
tree.add_node(name, info=info)
# Add edges
for name, info in tasks.items():
if info['after'] is ALL:
# ignore these
continue
for req in info['after']:
if req not in all_task_names:
msg = '"{}" pipeline element was not found, but it is declared as dependency of the pipeline "{}" with arguments "{}"'
raise DependencyError(msg.format(req, name, info))
if req in tree: # don't add an edge if dependency is not part of the current set of tasks
tree.add_edge(req, name)
# Not as useful as it originally seemed
# tree = prune_edges(tree)
# Check for circular dependencies
try:
cycle = nx.simple_cycles(tree).next()
raise DependencyError('Circular dependencies detected: {}'.format(cycle))
except StopIteration:
# Good - didn't want any cycles
pass
# Joins (merge multiple tasks) have more than one edge in
# joins = [n for n, d in tree.in_degree_iter() if d > 1]
# Don't support joins right now, one reducer at the end of the chain
# if joins:
# raise DependencyError('Multiple after values not currently supported joins="{}"'.format(joins))
# TODO - even with joins this could be a challenge
# Can't handle "N" shapes, so check for those
# Descendants of forks, cannot join from outside.
return tree
评论列表
文章目录