def get_execution_order(cls, G):
"""Return a list of nodes to execute.
This method returns the minimal list of nodes that need to be executed in
graph G in order to return the requested outputs.
The ordering of the nodes is fixed.
Parameters
----------
G : nx.DiGraph
Returns
-------
nodes : list
nodes that require execution
"""
# Get the cache dict if it exists
cache = G.graph.get('_executor_cache', {})
output_nodes = G.graph['outputs']
# Filter those output nodes who have an operation to run
needed = tuple(sorted(node for node in output_nodes if 'operation' in G.node[node]))
if needed not in cache:
# Resolve the nodes that need to be executed in the graph
nodes_to_execute = set(needed)
if 'sort_order' not in cache:
cache['sort_order'] = nx_constant_topological_sort(G)
sort_order = cache['sort_order']
# Resolve the dependencies of needed
dep_graph = nx.DiGraph(G.edges())
for node in sort_order:
attr = G.node[node]
if attr.keys() >= {'operation', 'output'}:
raise ValueError('Generative graph has both op and output present')
# Remove those nodes from the dependency graph whose outputs are present
if 'output' in attr:
dep_graph.remove_node(node)
elif 'operation' not in attr:
raise ValueError('Generative graph has no op or output present')
# Add the dependencies of the needed nodes
for needed_node in needed:
nodes_to_execute.update(nx.ancestors(dep_graph, needed_node))
# Turn in to a sorted list and cache
cache[needed] = [n for n in sort_order if n in nodes_to_execute]
return cache[needed]
评论列表
文章目录