def _transform(self, source_type: Type[S], target_type: Type[T]) -> Tuple[Callable[[S], T], int]:
try:
LOGGER.info("Searching type graph for shortest path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
path = dijkstra_path(self._type_graph, source=source_type, target=target_type, weight="cost")
LOGGER.info("Found a path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
except (KeyError, NetworkXNoPath):
raise NoConversionError("Pipeline can't convert \"{source_type}\" to \"{target_type}\"".format(source_type=source_type, target_type=target_type))
LOGGER.info("Building transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
chain = []
cost = 0
for source, target in _pairwise(path):
transformer = self._type_graph.adj[source][target][_TRANSFORMER]
chain.append((transformer, target))
cost += transformer.cost
LOGGER.info("Built transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
if not chain:
return _identity, 0
return partial(_transform, transformer_chain=chain), cost
python类dijkstra_path()的实例源码
def shortest_path(graph, source, target):
"""Return the windowed shortest path between source and target
in the given graph.
Graph is expected to be a dict {node: {successors}}.
Return value is a tuple of 2-tuple, each 2-tuple representing a
window of size 2 on the path.
"""
if source == target: return tuple() # no move needed
nxg = nx.Graph()
for node, succs in graph.items():
for succ in succs:
nxg.add_edge(node, succ)
return tuple(nx.dijkstra_path(nxg, source, target))
def get_path(self,q0,qf):
"""
Searches for a path in the PRM between configurations q0 and qf.
Returns a list of configuration tuples describing the path or []
if no path is found.
"""
q0 = np.reshape(np.array(q0),3)
qf = np.reshape(np.array(qf),3)
if all(q0 == qf):
return [qf]
n0 = len(self.G.node)
nf = n0 + 1
# Add the start and end configs to G, so we can just search it
self.G.add_node(n0,cfg=q0)
self.G.add_node(nf,cfg=qf)
for k in [n0,nf]:
self._connect(k,DobotModel.forward_kinematics(self.G.node[k]['cfg']))
if not nx.has_path(self.G,n0,nf):
path = [] # could not find a path
else:
nodes = nx.dijkstra_path(self.G,n0,nf,'weight')
path = [self.G.node[k]['cfg'] for k in nodes]
# Remove the start and end configs so G remains consistent with tree
self.G.remove_node(n0)
self.G.remove_node(nf)
return path
def analysis_graph(self):
graph = nx.Graph()
for array in self.list_node:
if array[0][0] == '3':
d = 3
elif array[0][0] == '5':
d = 2
elif array[0][0] == '?' and array[0][1]=='3':
d = 2
elif '?' in array[0]:
d = 3
else:
d = 1
try:
graph.add_edge(array[0], array[1], weight=array[2], capacity=math.ceil(60 / (24 // d + array[2])))
except ZeroDivisionError:
graph.add_edge(array[0], array[1], weight=array[2], capacity=1000)
time = Time()
dictonary = {}
for move in self.schedule:
try:
path = nx.dijkstra_path(graph, move[0], move[1])
time.hour = int(self.day_time.split('.')[0])
time.minute = int(self.day_time.split('.')[1])
time.second = 0
for j in range(1, len(path)):
weight = (graph.edge[path[j - 1]][path[j]]['weight'])
capacity = (graph.edge[path[j - 1]][path[j]]['capacity'])
time.add(weight)
string = time.string() + " " + path[j] + '/' + path[j-1]
if string in dictonary.keys():
dictonary[string] += 1/capacity
else:
dictonary[string] = 1/capacity
except nx.exception.NetworkXNoPath:
print(move)
except KeyError:
print(move)
finall = []
for key, value in dictonary.items():
if value > 2:
finall.append(str(int(value)) + ' - ' + str(key))
finall.sort(key=lambda x: int(x.split(' - ')[0]), reverse=True)
return finall