def findPaths(Graph, source, des, pathLen):
#for path like V1//V2
if pathLen == 0:
#Old version find paths:
#while True:
#pathLen += 1
#print source, des
#paths = nx.all_simple_paths(Graph, source, des, pathLen)
#if (len(pathList) != 0) or (pathLen == 3) :
#if (pathLen == 3) :
#pathLen = 0
#break
#New version find paths:
paths = nx.all_shortest_paths(Graph, source, des)
pathList = list(paths)
return pathList
#for path like V1/./V2 with specific length
else:
paths = nx.all_simple_paths(Graph, source, des, pathLen)
pathList = list(paths)
return pathList
python类all_shortest_paths()的实例源码
def findPaths(Graph, source, des, pathLen):
#for path like V1//V2
if pathLen == 0:
#Old version find paths:
#while True:
#pathLen += 1
#print source, des
#paths = nx.all_simple_paths(Graph, source, des, pathLen)
#if (len(pathList) != 0) or (pathLen == 3) :
#if (pathLen == 3) :
#pathLen = 0
#break
#New version find paths:
paths = nx.all_shortest_paths(Graph, source, des)
pathList = list(paths)
return pathList
#for path like V1/./V2 with specific length
else:
paths = nx.all_simple_paths(Graph, source, des, pathLen)
pathList = list(paths)
return pathList
def findPaths(Graph, source, des, pathLen):
#for path like V1//V2
if pathLen == 0:
#Old version find paths:
#while True:
#pathLen += 1
#print source, des
#paths = nx.all_simple_paths(Graph, source, des, pathLen)
#if (len(pathList) != 0) or (pathLen == 3) :
#if (pathLen == 3) :
#pathLen = 0
#break
#New version find paths:
paths = nx.all_shortest_paths(Graph, source, des)
pathList = list(paths)
return pathList
#for path like V1/./V2 with specific length
else:
paths = nx.all_simple_paths(Graph, source, des, pathLen)
pathList = list(paths)
return pathList
def findshortestpath(self, obj0SRotmat4, obj1SRotmat4):
self.__addEnds(obj0SRotmat4, obj1SRotmat4)
# startrgt goalrgt
if len(self.regghalf[0].endnodeids) > 0 and len(self.regghalf[1].endnodeids) > 0:
startgrip = self.regghalf[0].endnodeids[0]
goalgrip = self.regghalf[1].endnodeids[0]
shortestpaths = nx.all_shortest_paths(self.regg, source = startgrip, target = goalgrip)
self.directshortestpaths = []
print shortestpaths
for path in shortestpaths:
for i, pathnode in enumerate(path):
if pathnode.startswith('endrgt') and i < len(path)-1:
continue
else:
self.directshortestpaths.append(path[i-1:])
break
for i, pathnode in enumerate(self.directshortestpaths[-1]):
if i > 0 and pathnode.startswith('endlft'):
self.directshortestpaths[-1]=self.directshortestpaths[-1][:i+1]
break
def pre_compute_paths(self, G):
"""
Computes paths between all hosts in the network. It computes up to
PATH_LIMIT paths per each host.
All computed paths are saved to a path lookup table path_table.py
Args:
G: networkx graph containing the topology of the network.
"""
host_combinations = itertools.permutations(self.hosts, 2)
for src, dst in host_combinations:
paths_generator = nx.all_shortest_paths(G, src.dpid, dst.dpid)
counter = 0
for path in paths_generator:
if counter > PATH_LIMIT:
break
# counter += 1 # TODO de-comment for big topologies
path = Path(src.dpid, src.port, dst.dpid, dst.port, path)
self.path_table.put_path(path = path, src = src.dpid, dst = dst.dpid)
def get_nodes_in_all_shortest_paths(graph, nodes, weight=None):
"""Gets all shortest paths from all nodes to all other nodes in the given list and returns the set of all nodes
contained in those paths using :func:`networkx.all_shortest_paths`.
:param pybel.BELGraph graph: A BEL graph
:param iter[tuple] nodes: The list of nodes to use to use to find all shortest paths
:param str weight: Edge data key corresponding to the edge weight. If none, uses unweighted search.
:return: A set of nodes appearing in the shortest paths between nodes in the BEL graph
:rtype: set[tuple]
.. note:: This can be trivially parallelized using :func:`networkx.single_source_shortest_path`
"""
shortest_paths_nodes_iterator = _get_nodes_in_all_shortest_paths_helper(graph, nodes, weight=weight)
return set(itt.chain.from_iterable(shortest_paths_nodes_iterator))
# TODO consider all shortest paths?
def _check_rule_typing(self, rule_id, graph_id, lhs_mapping, rhs_mapping):
all_paths = nx.all_pairs_shortest_path(self)
paths_from_target = {}
for s in self.nodes():
if s == graph_id:
for key in all_paths[graph_id].keys():
paths_from_target[key] = all_paths[graph_id][key]
for t in paths_from_target.keys():
if t != graph_id:
new_lhs_h = compose(
lhs_mapping,
self.compose_path_typing(paths_from_target[t]))
new_rhs_h = compose(
rhs_mapping,
self.compose_path_typing(paths_from_target[t]))
try:
# find homomorphisms from s to t via other paths
s_t_paths = nx.all_shortest_paths(self, rule_id, t)
for path in s_t_paths:
lhs_h, rhs_h = self.compose_path_typing(path)
if lhs_h != new_lhs_h:
raise HierarchyError(
"Invalid lhs typing: homomorphism does not commute with an existing " +
"path from '%s' to '%s'!" % (s, t)
)
if rhs_h != new_rhs_h:
raise HierarchyError(
"Invalid rhs typing: homomorphism does not commute with an existing " +
"path from '%s' to '%s'!" % (s, t)
)
except(nx.NetworkXNoPath):
pass
return
def get_shortest_paths(self, source, target):
"""Compute all shortest paths in the graph.
Returns:
generator of lists: A generator of all paths between source and
target.
"""
if not isaddress(source) or not isaddress(target):
raise ValueError('both source and target must be valid addresses')
return networkx.all_shortest_paths(self.graph, source, target)
def test_closure_and_cclosure_against_networkx():
""" Test 'clusure' and 'cclosure' on 'metric' againt the NetworkX shortest_path (Rion's testing) """
G = nx.Graph()
G.add_nodes_from([0,1,2,3,4])
G.add_edges_from([(0,1), (1,2), (2,3), (3,4)], weight=0.1)
G.add_edges_from([(0,4)], weight=0.8)
# Extract Adjacency Matrix from G
A = nx.adjacency_matrix(G)
# Transform distance into a similarity
x = np.ravel(A[A > 0])
A[A > 0] = (1.0 / (x + 1.0))
for n1, n2 in combinations(G.nodes(),2):
# Tests all three methods of computing all shortest paths ('closure','cclosure', and 'nx.all_shortest_paths')
c_dist, c_paths = clo.closure(A, source=n1, target=n2, kind='metric')
c_paths = [n for n in c_paths] # convers numbers to letters
cc_dist, cc_paths = clo.cclosure(A, source=n1, target=n2, retpath=1, kind='metric')
cc_paths = [n for n in cc_paths] if cc_paths is not None else ''
nx_paths = list(nx.all_shortest_paths(G, source=n1, target=n2, weight='weight'))[0]
assert nx_paths == c_paths, "NetworkX and Python 'closure' differ"
assert nx_paths == cc_paths, "NetworkX and Cython 'cclosure' differ"
assert c_paths == cc_paths, "Python 'closure' and Cython 'cclosure' differ"
def findshortestpath(self, startrotmat4, goalrotmat4, base):
self.__addstartgoal(startrotmat4, goalrotmat4, base)
# startgrip = random.select(self.startnodeids)
# goalgrip = random.select(self.goalnodeids)
startgrip = self.startnodeids[0]
goalgrip = self.goalnodeids[0]
self.shortestpaths = nx.all_shortest_paths(self.regg, source = startgrip, target = goalgrip)
self.directshortestpaths = []
# directshortestpaths removed the repeated start and goal transit
try:
for path in self.shortestpaths:
print path
for i, pathnode in enumerate(path):
if pathnode.startswith('start') and i < len(path)-1:
continue
else:
self.directshortestpaths.append(path[i-1:])
break
for i, pathnode in enumerate(self.directshortestpaths[-1]):
if i > 0 and pathnode.startswith('goal'):
self.directshortestpaths[-1]=self.directshortestpaths[-1][:i+1]
break
except:
print "No path found!"
pass
def _check_rule_typing(self, rule_id, graph_id, lhs_mapping, rhs_mapping):
all_paths = nx.all_pairs_shortest_path(self)
paths_from_target = {}
for s in self.nodes():
if s == graph_id:
for key in all_paths[graph_id].keys():
paths_from_target[key] = all_paths[graph_id][key]
for t in paths_from_target.keys():
if t != graph_id:
new_lhs_h = compose(
lhs_mapping,
self.compose_path_typing(paths_from_target[t]))
new_rhs_h = compose(
rhs_mapping,
self.compose_path_typing(paths_from_target[t]))
try:
# find homomorphisms from s to t via other paths
s_t_paths = nx.all_shortest_paths(self, rule_id, t)
for path in s_t_paths:
lhs_h, rhs_h = self.compose_path_typing(path)
if lhs_h != new_lhs_h:
raise HierarchyError(
"Invalid lhs typing: homomorphism does not "
"commute with an existing "
"path from '%s' to '%s'!" % (s, t)
)
if rhs_h != new_rhs_h:
raise HierarchyError(
"Invalid rhs typing: homomorphism does not "
"commute with an existing " +
"path from '%s' to '%s'!" % (s, t)
)
except(nx.NetworkXNoPath):
pass
return
def all_paths(self, G, src=None, dst=None, src_dpid=None, dst_dpid=None):
"""
For a given source src and destination dst, compute all shortest paths.
Args:
G: graph.
src: source obj. Defaults to None.
dst: destination obj. Defaults to None.
src_dpid: source dpid. Defaults to None.
dst_dpid: dst dpid. Defaults to None.
Returns:
List of all paths between src and dst. If only src_dpid and dst_dpid
were given it returns a list of paths defined as a list of nodes and not path objects.
"""
if src_dpid and dst_dpid and not src and not dst:
path_list = []
for path in nx.all_shortest_paths(G, src_dpid, dst_dpid):
path_list.append(path)
return path_list
src_dpid = src.dpid
dst_dpid = dst.dpid
# print "Fetching all paths between {}-{}".format(src, dst)
if self.path_table.has_path(src_dpid, dst_dpid):
"Using cached path"
return [p for p in self.path_table.get_path(src_dpid, dst_dpid)]
else:
"Compute and cache new path"
print "Inserting new path in cache!"
for path in nx.all_shortest_paths(G, src_dpid, dst_dpid):
pathObj = Path.of(src, dst, path)
print pathObj
self.path_table.put_path(pathObj, src_dpid, dst_dpid)
return self.path_table.get_path(src_dpid, dst_dpid)
def findSwitchRoute():
pathKey = ""
nodeList = []
src = int(switch[h2].split(":",7)[7],16)
dst = int(switch[h1].split(":",7)[7],16)
print src
print dst
for currentPath in nx.all_shortest_paths(G, source=src, target=dst, weight=None):
for node in currentPath:
tmp = ""
if node < 17:
pathKey = pathKey + "0" + str(hex(node)).split("x",1)[1] + "::"
tmp = "00:00:00:00:00:00:00:0" + str(hex(node)).split("x",1)[1]
else:
pathKey = pathKey + str(hex(node)).split("x",1)[1] + "::"
tmp = "00:00:00:00:00:00:00:" + str(hex(node)).split("x",1)[1]
nodeList.append(tmp)
pathKey=pathKey.strip("::")
path[pathKey] = nodeList
pathKey = ""
nodeList = []
print path
# Computes Link TX
def get_shortest_paths(self, src, dst, with_weight=True):
if with_weight:
return nx.all_shortest_paths(self.graph,source=src,target=dst, weight='delay')
else:
return nx.all_shortest_paths(self.graph,source=src,target=dst)
def _get_nodes_in_all_shortest_paths_helper(graph, nodes, weight=None):
for u, v in product(nodes, repeat=2):
try:
paths = all_shortest_paths(graph, u, v, weight=weight)
for path in paths:
yield path
except nx.exception.NetworkXNoPath:
continue
def run_cna(graph, root, targets, relationship_dict=None):
""" Returns the effect from the root to the target nodes represented as {-1,1}
:param pybel.BELGraph graph: A BEL graph
:param tuple root: The root node
:param iter targets: The targets nodes
:param dict relationship_dict: dictionary with relationship effects
:return list[tuple]:
"""
causal_effects = []
relationship_dict = causal_effect_dict if relationship_dict is None else relationship_dict
for target in targets:
try:
shortest_paths = nx.all_shortest_paths(graph, source=root, target=target)
effects_in_path = set()
for shortest_path in shortest_paths:
effects_in_path.add(get_path_effect(graph, shortest_path, relationship_dict))
if len(effects_in_path) == 1:
causal_effects.append((root, target, next(iter(effects_in_path)))) # Append the only predicted effect
elif Effect.activation in effects_in_path and Effect.inhibition in effects_in_path:
causal_effects.append((root, target, Effect.ambiguous))
elif Effect.activation in effects_in_path and Effect.inhibition not in effects_in_path:
causal_effects.append((root, target, Effect.activation))
elif Effect.inhibition in effects_in_path and Effect.activation not in effects_in_path:
causal_effects.append((root, target, Effect.inhibition))
else:
log.warning('Exception in set: {}.'.format(effects_in_path))
except nx.NetworkXNoPath:
log.warning('No shortest path between: {} and {}.'.format(root, target))
return causal_effects