def local_path_index(G, ebunch=None):
if ebunch is None:
ebunch = nx.non_edges(G)
def predict(u, v):
#NeighborSet = nx.all_neighbors(G, u)
#len( sorted(nx.common_neighbors(G, u, v) ))
paths = list( nx.all_simple_paths(G, source=u, target=v, cutoff=3))
print paths
A2 = 0.0
A3 = 0.0
for path in paths:
if len(path) == 3:
A2 = A2 + 1.0
elif len(path) == 4:
A3 = A3 + 1.0
return A2 + 0.001 * A3 #Coefficient = 0.001
Rank_List = []
for u, v in ebunch:
Rank_List.append((u, v, predict(u, v)))
return Rank_List #((u, v, predict(u, v)) for u, v in ebunch)
#return ((u, v, predict(u, v)) for u, v in ebunch)
python类all_simple_paths()的实例源码
def findPaths(Graph, source, des, pathLen):
#for path like V1//V2
if pathLen == 0:
#Old version find paths:
#while True:
#pathLen += 1
#print source, des
#paths = nx.all_simple_paths(Graph, source, des, pathLen)
#if (len(pathList) != 0) or (pathLen == 3) :
#if (pathLen == 3) :
#pathLen = 0
#break
#New version find paths:
paths = nx.all_shortest_paths(Graph, source, des)
pathList = list(paths)
return pathList
#for path like V1/./V2 with specific length
else:
paths = nx.all_simple_paths(Graph, source, des, pathLen)
pathList = list(paths)
return pathList
def findPaths(Graph, source, des, pathLen):
#for path like V1//V2
if pathLen == 0:
#Old version find paths:
#while True:
#pathLen += 1
#print source, des
#paths = nx.all_simple_paths(Graph, source, des, pathLen)
#if (len(pathList) != 0) or (pathLen == 3) :
#if (pathLen == 3) :
#pathLen = 0
#break
#New version find paths:
paths = nx.all_shortest_paths(Graph, source, des)
pathList = list(paths)
return pathList
#for path like V1/./V2 with specific length
else:
paths = nx.all_simple_paths(Graph, source, des, pathLen)
pathList = list(paths)
return pathList
def findPaths(Graph, source, des, pathLen):
#for path like V1//V2
if pathLen == 0:
#Old version find paths:
#while True:
#pathLen += 1
#print source, des
#paths = nx.all_simple_paths(Graph, source, des, pathLen)
#if (len(pathList) != 0) or (pathLen == 3) :
#if (pathLen == 3) :
#pathLen = 0
#break
#New version find paths:
paths = nx.all_shortest_paths(Graph, source, des)
pathList = list(paths)
return pathList
#for path like V1/./V2 with specific length
else:
paths = nx.all_simple_paths(Graph, source, des, pathLen)
pathList = list(paths)
return pathList
def daisy_chains(self, kih, max_path_length=None):
""" Generator for daisy chains (complementary kihs) associated with a knob.
Notes
-----
Daisy chain graph is the directed graph with edges from knob residue to each hole residue for each KnobIntoHole
in self.
Given a KnobIntoHole, the daisy chains are non-trivial paths in this graph (walks along the directed edges)
that begin and end at the knob.
These paths must be of length <= max_path_length
Parameters
----------
kih : KnobIntoHole interaction.
max_path_length : int or None
Maximum length of a daisy chain.
Defaults to number of chains in self.ampal_parent.
This is the maximum sensible value. Larger values than this will cause slow running of this function.
"""
if max_path_length is None:
max_path_length = len(self.ampal_parent)
g = self.daisy_chain_graph
paths = networkx.all_simple_paths(g, source=kih.knob, target=kih.knob, cutoff=max_path_length)
return paths
def findAllPaths(self, max_paths=10, sort=True, debug=False):
""" Find all paths through DAG to End
Params:
max_paths (int :default:=10): Number of paths to find
If this is > 1000, all paths will be found
sort (bool) : If True (default), sort paths
in ascending order of length
"""
if self.roots:
self.lockStart()
# shortest_simple_paths is slow for >1000 paths
if max_paths <= 1000:
return list(six.moves.map(lambda x: x[1:-1], \
islice(nx.shortest_simple_paths(self.G,
self.start,
self.end),
max_paths)))
else: # Fall back to all_simple_paths
ps = list(six.moves.map(lambda x: x[1:-1], \
nx.all_simple_paths(self.G, self.start, self.end)))
# If we do not intend to display paths, no need to sort them
if sort:
ps.sort(key=lambda x: len(x))
return ps
def weighted_local_path_index(G, ebunch=None):
if ebunch is None:
ebunch = nx.non_edges(G)
def predict(u, v):
#NeighborSet = nx.all_neighbors(G, u)
#len( sorted(nx.common_neighbors(G, u, v) ))
paths = list( nx.all_simple_paths(G, source=u, target=v, cutoff=3))
#print paths
A2_weight = 0.0
A3_weight = 0.0
for path in paths:
if len(path) == 3:
for node in range(0, len(path)-1):
A2_weight = A2_weight + G[path[node]][path[node+1]]['weight']
elif len(path) == 4:
for node in range(0, len(path)-1):
A3_weight = A3_weight + G[path[node]][path[node+1]]['weight']
#value = sum( G[w][u]['weight'] + G[w][v]['weight'] for w in nx.common_neighbors(G, u, v) )
return A2_weight + 0.001 * A3_weight #+value #Coefficient = 0.001
Rank_List = []
for u, v in ebunch:
Rank_List.append((u, v, predict(u, v)))
return Rank_List #((u, v, predict(u, v)) for u, v in ebunch)
#return ((u, v, predict(u, v)) for u, v in ebunch)
def calc_path(dg, start, end, cutoff): #,count_conn
count_paths = 0
paths = nx.all_simple_paths(dg, start, end, cutoff)
DEBUG_PRINT((start, end, cutoff))
for x in paths:
count_paths = count_paths + 1
# count_conn.put(count_paths)
def prune_edges(tree):
tree = tree.copy()
# Remove redundent edges
# Given: edges((a, b), (b, c), (a, c))
# Then : Edge (a, c) is not required
# As it is coverd by the path (a, b, c)
for name, data in tree.nodes(data=True):
for prereq in data['info']['after']:
paths = list(nx.all_simple_paths(tree, prereq, name))
if len(paths) > 1:
tree.remove_edge(prereq, name)
return tree
def _all_simple_paths(G, source, target, cutoff=None):
"""
Adaptation of nx.all_simple_paths for mutligraphs
"""
if source not in G:
raise nx.NetworkXError('source node %s not in graph'%source)
if target not in G:
raise nx.NetworkXError('target node %s not in graph'%target)
if cutoff is None:
cutoff = len(G)-1
if G.is_multigraph():
return _all_simple_paths_multigraph(G, source, target, cutoff=cutoff)
else:
return 1 #_all_simple_paths_graph(G, source, target, cutoff=cutoff)
def get_paths(self):
# If there's only one node
if nx.number_of_nodes(self.graph) == 1:
self.shortest_path = self.longest_path = [self.function_start]
return [[self.function_start]]
# If there aren't any obvious exit blocks
if len(self.exit_blocks) == 0:
return
# We need to go through all the possible paths from
# function start to each of exit blocks
all_paths = []
longest_path_len = 0
shortest_path_len = None
for ret in self.exit_blocks:
paths = (nx.all_simple_paths(self.graph, source = self.function_start, target = ret))
for path in paths:
if len(path) > longest_path_len:
longest_path_len = len(path)
self.longest_path = path
if not shortest_path_len or len(path) < shortest_path_len:
shortest_path_len = len(path)
self.shortest_path = path
all_paths.extend(paths)
return all_paths
def structure_dependent_index(G, ebunch=None):
if ebunch is None:
ebunch = nx.non_edges(G)
#C = nx.average_clustering(G)
#d = nx.average_shortest_path_length(G)
path_range = max(2, math.ceil(nx.average_shortest_path_length(G)))
#print path_range
def predict(u, v):
#NeighborSet = nx.all_neighbors(G, u)
#len( sorted(nx.common_neighbors(G, u, v) ))
SD_Index = {}
#Generate all simple paths in the graph G from source to target, length <= cutoff .
paths = list( nx.all_simple_paths(G, source=u, target=v, cutoff = path_range))
print paths
for path in paths:
if SD_Index.has_key( len(path) ):
SD_Index[len(path)] = SD_Index[len(path)] + 1.0
else:
SD_Index[len(path)] = 1.0
#end for
print SD_Index
#Sum up the num of paths with different length
Coefficient = 0.6
SD_Value = 0.0
key_Sequence = list(sorted(SD_Index.keys()))
for key in key_Sequence:
if key != 2:
SD_Value = SD_Value + math.pow(Coefficient, key-2.0) * SD_Index[key]
#end for
return SD_Value #Coefficient = 0.6
Rank_List = []
for u, v in ebunch:
Rank_List.append((u, v, predict(u, v)))
return Rank_List #((u, v, predict(u, v)) for u, v in ebunch)
##======================================================================##
def __validate_coloring(self):
log.debug("validating red-blue coloring; this could take a while (forever) if your graph is too big")
# All red paths to a vertex should be disjoint from all blue paths
# to the same vertex, except for red-blue links and their incident nodes
red_dag = self.get_red_graph()
blue_dag = self.get_blue_graph()
source = self.root
for dst in self.graph.nodes():
if dst == source:
continue
red_paths = list(nx.all_simple_paths(red_dag, source, dst))
red_nodes = set(n for p in red_paths for n in p)
red_edges = set(e for p in red_paths for e in zip(p, p[1:]))
blue_paths = list(nx.all_simple_paths(blue_dag, source, dst))
blue_nodes = set(n for p in blue_paths for n in p)
blue_edges = set(e for p in blue_paths for e in zip(p, p[1:]))
redblue_edges = red_edges.intersection(blue_edges)
redblue_nodes = red_nodes.intersection(blue_nodes)
redblue_nodes.remove(source)
redblue_nodes.remove(dst)
assert all(self._get_edge_color(e) == 'red-blue' for e in redblue_edges),\
"invalid coloring: non cut link shared by red and blue paths!"
# every shared node has at least one incident cut link
# TODO: finish this? unclear it's necessary as it just validates consistency of coloring not actual correctness of properties
# assert all(any(self._get_edge_color(e) == 'red-blue' for e in
# list(self.graph.successors(n)) + list(self.graph.predecessors(n)))
# for n in redblue_nodes), "invalid coloring of nodes: shares a non-cut node!"
# verify each red-blue edge or node is a cut edge/node
for cut_node in redblue_nodes:
g = self.graph.subgraph(n for n in self.graph.nodes() if n != cut_node)
# could induce an empty (or near-empty) graph
if source not in g or dst not in g:
continue
assert not nx.has_path(g, source, dst), "invalid coloring: non cut node shared by red and blue paths!"
for cut_link in redblue_edges:
g = self.graph.edge_subgraph(e for e in self.graph.edges() if e != cut_link)
# could induce an empty (or near-empty) graph
if source not in g or dst not in g:
continue
assert not nx.has_path(g, source, dst), "invalid coloring: non cut link shared by red and blue paths!"
# draw_overlaid_graphs(self.graph, [red_dag, blue_dag])
return True