def _generateDAG(self):
'''
Generate workflow DAG using networkx directed graph implementation.
Return topological ordering of graphs. Note that nx.topological_sort(G)
requires the graph to be acyclic. Cyclic behavior is hard to implement
in practice since jobs are defined by calling specific dictionary elements.
'''
# Instantiate directed graph, add job dependency edges.
G = nx.DiGraph()
for job in self.jobs:
G.add_node(job)
if 'dependsOn' in self.jobs[job]:
for dependency in self.jobs[job]['dependsOn']:
G.add_edge(dependency['jobKey'], self.jobs[job]['jobKey'])
self.dag_graph = G # For printing purposes.
# Sort jobs in graph using topological sort, assigning job buckets.
# Jobs within the same bucket will be kicked off simultaneously.
topology = nx.topological_sort(G)
self.graph = [(0, topology[0])]
for edge in topology[1:]:
try:
self.graph.append((len(nx.shortest_path(G, topology[0], edge)) - 1, edge))
except nx.exception.NetworkXNoPath as error:
self.graph.append((0, edge))
评论列表
文章目录