def get_sccs_topo(graph):
"""
Return strongly connected subsystems of the given Group in topological order.
Parameters
----------
graph : networkx.DiGraph
Directed graph of Systems.
Returns
-------
list of sets of str
A list of strongly connected components in topological order.
"""
# Tarjan's algorithm returns SCCs in reverse topological order, so
# the list returned here is reversed.
sccs = list(nx.strongly_connected_components(graph))
sccs.reverse()
return sccs
python类strongly_connected_components()的实例源码
def bag_of_connected_components(graph):
""" Bag of strongly connected components """
comp = nx.strongly_connected_components(graph)
return __bag_of_components(graph, comp)
prepare_geojson_to_agentpolisdemo.py 文件源码
项目:roadmap-processing
作者: aicenter
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def get_biggest_component(graph):
biggest_subgraph = graph
if not nx.is_strongly_connected(graph):
maximum_number_of_nodes = -1
for subgraph in nx.strongly_connected_components(graph):
if len(subgraph) > maximum_number_of_nodes:
maximum_number_of_nodes = len(subgraph)
biggest_subgraph = subgraph
return biggest_subgraph
def from_cobol_graph(cls, cobol_graph):
"""Identify loops in a CobolStructureGraph and break them by adding Loop
and ContinueLoop nodes. Returns the resulting AcyclicStructureGraph.
"""
dag = cls()
# Copy by way of edges, to avoid getting copies of the node objects
dag.graph.add_edges_from(cobol_graph.graph.edges(keys=True, data=True))
# Loops are strongly connected components, i.e. a set of nodes
# which can all reach the other ones via some path through the
# component.
# Since loops can contain loops, this is done repeatedly until all
# loops have been broken. At this stage single-node loops are ignored,
# since nx.strongly_connected_components() returns components also
# consisting of a single nodes without any self-looping edge.
while True:
components = [c for c in nx.strongly_connected_components(dag.graph)
if len(c) > 1]
if not components:
break
for component in components:
dag._break_component_loop(component)
# Finally find any remaining single-node loops
for node in list(dag.graph):
if dag.graph[node].get(node) is not None:
dag._break_component_loop({node})
return dag
def compute_potentials(pa):
'''Computes the potential function for each state of the product automaton.
The potential function represents the minimum distance to a self-reachable
final state in the product automaton.
'''
assert 'v' not in pa.g
# add virtual node which connects to all initial states in the product
pa.g.add_node('v')
pa.g.add_edges_from([('v', p) for p in pa.init])
# create strongly connected components of the product automaton w/ 'v'
scc = list(nx.strongly_connected_components(pa.g))
dag = nx.condensation(pa.g, scc)
# get strongly connected component which contains 'v'
for k, sc in enumerate(scc[::-1]):
if 'v' in sc:
start = len(scc) - k - 1
break
assert 'v' in scc[start]
assert map(lambda sc: 'v' in sc, scc).count(True) == 1
# get self-reachable final states
pa.srfs = self_reachable_final_states_dag(pa, dag, scc, start)
# remove virtual node from product automaton
pa.g.remove_node('v')
assert 'v' not in pa.g
if not pa.srfs:
return False
# add artificial node 'v' and edges from the set of self reachable
# states (pa.srfs) to 'v'
pa.g.add_node('v')
for p in pa.srfs:
pa.g.add_edge(p, 'v', **{'weight': 0})
# compute the potentials for each state of the product automaton
lengths = nx.shortest_path_length(pa.g, target='v', weight='weight')
for p in pa.g:
pa.g.node[p]['potential'] = lengths[p]
# remove virtual state 'v'
pa.g.remove_node('v')
return True
def has_empty_language(model, trivial=False):
'''
Checks if the language associated with the model is empty. It verifies if
there are any self-reachable final states of the model which are also
reachable from some initial state.
Parameters
----------
model : Model
A finite system model.
trivial: boolean
Specify if self-loops are allowed in the definition of self-reachability.
By default, self-loops are not allowed.
Returns
-------
empty : boolean
True if the language is empty.
See Also
--------
networkx.networkx.algorithms.components.strongly_connected.strongly_connected_components
self_reachable_final_states
product
Note
----
This function is intended to be used on product automata.
'''
return len(self_reachable_final_states(model, trivial)) == 0
def post_parse(grammar):
# Create `G`
G = DiGraph()
G.add_nodes_from(grammar.prods)
for lhs, rhs in grammar.prods.items():
for tok in rhs:
if tok in grammar.prods:
G.add_edge(lhs, tok)
# DEBUG: from ._debug import Drawer # DEBUG
# DEBUG: drawer = Drawer(G, grammar.start) # DEBUG
# Inlining
for root, _ in list(bfs_edges(G, grammar.start)):
while True:
nodes = [d for _, d in bfs_edges(G, root)]
nodes = [root] + nodes
edges = []
for n in nodes:
edges.extend(G.edges([n]))
for ns, nd in reversed(edges):
# DEBUG: drawer.draw(G, (ns, nd)) # DEBUG
# Skip if `nd` has a self-loop
if G.has_edge(nd, nd):
continue
# Skip if `C` consists of multiple nodes
g = G.subgraph(n for n in G.nodes_iter() if n != ns)
if len(next((c for c in sccs(g) if nd in c))) != 1:
continue
# Update grammar
expr = []
alter = Token.alter()
for tok in grammar.prods[ns]:
expr.append(tok)
if tok == nd:
expr.extend(list(grammar.prods[nd]) + [alter])
grammar.prods[ns] = Expr(expr)
# Update G
G.remove_edge(ns, nd)
for _, dst in G.edges_iter(nd):
G.add_edge(ns, dst)
# DEBUG: drawer.draw(G) # DEBUG
break # Back to `for ns, nd in ...`
else:
# DEBUG: drawer.draw(G) # DEBUG
break # Back to `for root, _ in ...`
return {nd for _, nd in G.edges_iter()} # Unexpanded nonterminals
def partition_graph(g: AssemblyGraph) -> Iterator[Tuple[AssemblyGraph, bool]]:
"""This function partitions a directed graph into a set of subgraphs. It
is partioned in such way that the set of super bubbles of `g` is the same
as the union of the super bubble sets of all subgraphs returned by this
function.
This function yields each partitioned subgraph, together with a flag if
it is acyclic or not.
"""
singleton_nodes = []
for connected_component in networkx.strongly_connected_components(g):
if len(connected_component) == 1:
singleton_nodes.extend(connected_component)
continue
subgraph = g.subgraph(connected_component)
for u, v in g.in_edges_iter(connected_component):
if u not in subgraph:
subgraph.add_edge('r_', v)
for u, v in g.out_edges_iter(connected_component):
if v not in subgraph:
subgraph.add_edge(u, "re_")
yield subgraph, False
# Build subgraph with only singleton strongly connected components
subgraph = g.subgraph(singleton_nodes)
for u, v in g.in_edges_iter(singleton_nodes):
if u not in subgraph:
subgraph.add_edge('r_', v)
start_nodes = [n for n in subgraph.nodes_iter()
if subgraph.in_degree(n) == 0]
for n in start_nodes:
if n == 'r_':
continue
subgraph.add_edge('r_', n)
for u, v in g.out_edges(singleton_nodes):
if v not in subgraph:
subgraph.add_edge(u, 're_')
sink_nodes = [n for n in subgraph.nodes_iter()
if subgraph.out_degree(n) == 0]
for n in sink_nodes:
if n == 're_':
continue
subgraph.add_edge(n, 're_')
yield subgraph, True