def TraverseGraph(transitions, states, args=None):
"""Does that actual graph traversal, going through all transitions."""
transition_graph, initial_vertex = stl.graph.BuildTransitionGraph(
transitions, states)
graph_file = None
if args:
graph_file = args.graph
visualizer = Visualizer(transition_graph, graph_file)
circuit_stack = stl.traverse.MinEdgeCoverCircuit(transition_graph,
initial_vertex)
circuit_stack.reverse()
success = True
while circuit_stack:
edge = circuit_stack.pop()
source, target, edge_i = edge
attr = transition_graph[source][target][edge_i]
transition = attr['transition']
visualizer.TransitionRunning(edge)
if attr['weight'] != float('inf'):
logging.info('\033[93m[ RUNNING ]\033[0m: %s', transition.name)
if transition.Run():
logging.info('\033[92m[ PASSED ]\033[0m: %s', transition.name)
visualizer.TransitionPassed(edge)
continue
else:
logging.error('\033[91m[ FAILED ]\033[0m: %s', transition.name)
success = False
attr['weight'] = float('inf')
error_vertex_id = attr['error_vertex_id']
visualizer.TransitionFailed(edge, error_vertex_id)
new_path = nx.shortest_path(
transition_graph, error_vertex_id, target, weight='weight')
path_stack = []
for i in range(len(new_path) - 1):
s = new_path[i]
t = new_path[i + 1]
# Get the edge-index with the smallest weight
multi_edge = transition_graph[s][t]
min_weight = float('inf')
min_edge_i = 0
for key, value in multi_edge.iteritems():
if value['weight'] < min_weight:
min_weight = value['weight']
min_edge_i = key
if transition_graph[s][t][min_edge_i]['weight'] == float('inf'):
return success
path_stack.append((s, t, min_edge_i))
# TODO(seantopping): Implement a better error recovery algorithm.
circuit_stack.extend(reversed(path_stack))
return success
评论列表
文章目录