python类shortest_path()的实例源码

rule_type_checking.py 文件源码 项目:ReGraph 作者: eugeniashurko 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _check_instance(hierarchy, graph_id, pattern, instance, pattern_typing):
    check_homomorphism(
        pattern,
        hierarchy.node[graph_id].graph,
        instance,
        total=True
    )
    # check that instance typing and lhs typing coincide
    for node in pattern.nodes():
        if pattern_typing:
            for typing_graph, typing in pattern_typing.items():
                try:
                    instance_typing = hierarchy.compose_path_typing(
                        nx.shortest_path(hierarchy, graph_id, typing_graph))
                    if node in pattern_typing.keys() and\
                       instance[node] in instance_typing.keys():
                        if typing[node] != instance_typing[instance[node]]:
                            raise RewritingError(
                                "Typing of the instance of LHS does not " +
                                " coincide with typing of LHS!")
                except NetworkXNoPath:
                    raise RewritingError(
                        "Graph '%s' is not typed by '%s' specified "
                        "as a typing graph of the lhs of the rule." %
                        (graph_id, typing_graph))
channelgraph.py 文件源码 项目:raiden 作者: raiden-network 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_paths_of_length(self, source, num_hops=1):
        """ Searchs for all nodes that are `num_hops` away.

        Returns:
            list of paths: A list of all shortest paths that have length
            `num_hops + 1`
        """
        # return a dictionary keyed by targets
        # with a list of nodes in a shortest path
        # from the source to one of the targets.
        all_paths = networkx.shortest_path(self.graph, source)

        return [
            path
            for path in all_paths.values()
            if len(path) == num_hops + 1
        ]
aborescene_synthesis.py 文件源码 项目:NetPower_TestBed 作者: Vignesh2208 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def compute_shortest_path_tree(self, dst_sw):

        spt = nx.DiGraph()

        mdg = self.network_graph.get_mdg()

        paths = nx.shortest_path(mdg, source=dst_sw.node_id)

        for src in paths:

            if src == dst_sw.node_id:
                continue

            for i in range(len(paths[src]) - 1):
                spt.add_edge(paths[src][i], paths[src][i+1])

        return spt
test_osmnx.py 文件源码 项目:osmnx 作者: gboeing 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_routing_folium():

    import networkx as nx
    with httmock.HTTMock(get_mock_response_content('overpass-response-3.json.gz')):
        G = ox.graph_from_address('398 N. Sicily Pl., Chandler, Arizona', distance=800, network_type='drive')
    origin = (33.307792, -111.894940)
    destination = (33.312994, -111.894998)
    origin_node = ox.get_nearest_node(G, origin)
    destination_node = ox.get_nearest_node(G, destination)
    route = nx.shortest_path(G, origin_node, destination_node)

    attributes = ox.get_route_edge_attributes(G, route, 'length')

    fig, ax = ox.plot_graph_route(G, route, save=True, filename='route', file_format='png')
    fig, ax = ox.plot_graph_route(G, route, origin_point=origin, destination_point=destination,
                                  save=True, filename='route', file_format='png')

    graph_map = ox.plot_graph_folium(G, popup_attribute='name')
    route_map = ox.plot_route_folium(G, route)
shortest_path_switch.py 文件源码 项目:sdn 作者: ray6 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ShortestPathInstall(self, ev, src, dst):
            #Compute shortest path
            path = nx.shortest_path(self.directed_Topo, src, dst)

            #Backup path
            str_path = str(path).replace(', ', ',')
            self.path_db.append(path)

            #Add Flow along with the path
            for k, sw in enumerate(self.switches):
                if sw in path:
                    next = path[path.index(sw)+1]

                    out_port = self.directed_Topo[sw][next]['port']

                    actions = [self.switches_dp[k].ofproto_parser.OFPActionOutput(out_port)]
                    match = self.switches_dp[k].ofproto_parser.OFPMatch(eth_src=src, eth_dst=dst)
                    self.add_flow(self.switches_dp[k], 1, match, actions, ev.msg.buffer_id)

    #Add host and adjacent switch to directed_Topo
rule_type_checking.py 文件源码 项目:ReGraph 作者: Kappa-Dev 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _check_instance(hierarchy, graph_id, pattern, instance, pattern_typing):
    check_homomorphism(
        pattern,
        hierarchy.node[graph_id].graph,
        instance,
        total=True
    )
    # check that instance typing and lhs typing coincide
    for node in pattern.nodes():
        if pattern_typing:
            for typing_graph, typing in pattern_typing.items():
                try:
                    instance_typing = hierarchy.compose_path_typing(
                        nx.shortest_path(hierarchy, graph_id, typing_graph))
                    if node in pattern_typing.keys() and\
                       instance[node] in instance_typing.keys():
                        if typing[node] != instance_typing[instance[node]]:
                            raise RewritingError(
                                "Typing of the instance of LHS does not " +
                                " coincide with typing of LHS!")
                except NetworkXNoPath:
                    raise RewritingError(
                        "Graph '%s' is not typed by '%s' specified "
                        "as a typing graph of the lhs of the rule." %
                        (graph_id, typing_graph))
ride_d.py 文件源码 项目:ride 作者: KyleBenson 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def notify_alert_response(self, responder, alert_ctx, mdmt_used):
        """
        Updates view of current network topology state based on the route traveled by this response.  Also records that
        this subscriber was reached so that future re-tries don't worry about reaching it.
        :param responder:
        :param alert_ctx:
        :type alert_ctx: RideD.AlertContext
        :param mdmt_used: we require this instead of gleaning it from the alert_ctx since this response may have been
        sent before the most recent alert attempt (i.e. we might calculate the response route wrong)
        :return:
        """

        # determine the path used by this response and notify RideD that it is currently functional
        route = nx.shortest_path(mdmt_used, self.get_server_id(), responder)
        log.debug("processing alert response via route: %s" % route)

        # NOTE: this likely won't do much as we probably already selected this MDMT since this route was functional...
        self.stt_mgr.route_update(route)

        alert_ctx.record_subscriber_reached(responder)

        if not alert_ctx.has_unreached_subscribers():
            log.info("alert %s successfully reached all subscribers! closing..." % alert_ctx)
            self.cancel_alert(alert_ctx, success=True)
paths.py 文件源码 项目:pybel-tools 作者: pybel 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _get_shortest__path_between_subgraphs_helper(graph, a, b):
    """Calculate the shortest path that occurs between two disconnected subgraphs A and B going through nodes in
    the source graph

    :param nx.MultiGraph graph: A graph
    :param nx.MultiGraph a: A subgraph of :code:`graph`, disjoint from :code:`b`
    :param nx.MultiGraph b: A subgraph of :code:`graph`, disjoint from :code:`a`
    :return: A list of the shortest paths between the two subgraphs
    :rtype: list
    """
    shortest_paths = []

    for na, nb in itt.product(a, b):
        a_b_shortest_path = nx.shortest_path(graph, na, nb)
        shortest_paths.append(a_b_shortest_path)

        b_a_shortest_path = nx.shortest_path(graph, nb, na)
        shortest_paths.append(b_a_shortest_path)

    min_len = min(map(len, shortest_paths))
    return [p for p in shortest_paths if len(p) == min_len]
mwpm.py 文件源码 项目:QTop 作者: jacobmarks 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def InternalRecovery(code, terminal1, terminal2, type, charge_type):
    measure_chain = nx.shortest_path(code.Dual[type], terminal1, terminal2)
    chain_length = nx.shortest_path_length(code.Dual[type], terminal1, terminal2)
    for link in range(chain_length):
        vertex1 = measure_chain[link]
        vertex2 = measure_chain[link + 1]
        for data_qubit in code.Stabilizers[type][vertex1]['data']:
            if data_qubit in code.Stabilizers[type][vertex2]['data']:
                prior_charge = code.Primal.node[data_qubit]['charge'][charge_type]
                code.Primal.node[data_qubit]['charge'][charge_type] = (prior_charge + 1) % 2

    return code
owner_graph_metrics.py 文件源码 项目:community-networks-monitoring-tools 作者: netCommonsEU 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getOwnerToOwnerCentrality(self, graph):
        """ compute the network centrality of all the nodes owned by a person
        with respect to all shortest paths beteween any owner"""

        nodeOwner = {}  # node -> owner
        ownerNodes = defaultdict(list)  # owner -> [nodes]
        ownerCentrality = defaultdict(int)
        ownerNodes, nodeOwner = self.get_owner_distribution(graph)
        counter = 0
        shortestPathAndDistance = {}
        for n in graph.nodes():
            shortestPathAndDistance[n] = nx.single_source_dijkstra(graph, n)
            # returns a couple (distance, path)
        for i in range(len(ownerNodes)):
            from_owner, from_nodes = ownerNodes.items()[i]
            for j in range(i+1, len(ownerNodes)):
                to_owner, to_nodes = ownerNodes.items()[j]
                shortest_path = []
                shortest_cost = 0
                for s in from_nodes:
                    for d in to_nodes:
                        if not shortest_path or shortestPathAndDistance[s][0][d] < shortest_cost:
                                shortest_path = shortestPathAndDistance[s][1][d]
                                shortest_cost = shortestPathAndDistance[s][0][d]
                counter += 1
                path_set = set([nodeOwner[node] for node in shortest_path])
                for o in path_set:
                    ownerCentrality[o] += 1
        print "# owner".rjust(long_align_space), ",",\
              "owner-to-owner cent.".rjust(long_align_space)
        for (p, c) in sorted(ownerCentrality.items(), key=lambda x: -x[1]):
            print p.rjust(long_align_space), ",",\
                str(c*1.0/counter).rjust(long_align_space)
        print ""
        print ""
        return ownerCentrality, counter
simple_mac_synthesis.py 文件源码 项目:NetPower_TestBed 作者: Vignesh2208 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def compute_path_intents(self, fs):

        intent_list = []

        fs.path = nx.shortest_path(self.network_graph.graph,
                                   source=fs.ng_src_host.node_id,
                                   target=fs.ng_dst_host.node_id,
                                   weight='weight')

        # Get the port where the host connects at the first switch in the path
        link_ports_dict = self.network_graph.get_link_ports_dict(fs.ng_src_host.node_id, fs.ng_src_host.sw.node_id)
        in_port = link_ports_dict[fs.ng_src_host.sw.node_id]

        # This loop always starts at a switch
        for i in range(1, len(fs.path) - 1):

            link_ports_dict = self.network_graph.get_link_ports_dict(fs.path[i], fs.path[i+1])

            fwd_flow_match = deepcopy(fs.flow_match)

            mac_int = int(fs.ng_src_host.mac_addr.replace(":", ""), 16)
            fwd_flow_match["ethernet_source"] = int(mac_int)

            mac_int = int(fs.ng_dst_host.mac_addr.replace(":", ""), 16)
            fwd_flow_match["ethernet_destination"] = int(mac_int)

            intent = Intent("primary",
                            fwd_flow_match,
                            in_port,
                            link_ports_dict[fs.path[i]],
                            True)

            # Store the switch id in the intent
            intent.switch_id = fs.path[i]

            intent_list.append(intent)
            in_port = link_ports_dict[fs.path[i+1]]

        return intent_list
fwd.py 文件源码 项目:Ryu-SDN-IP 作者: sdnds-tw 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_shortest_path(self, nx_graph, src_dpid, dst_dpid):

        if nx.has_path(nx_graph, src_dpid, dst_dpid):
            return nx.shortest_path(nx_graph, src_dpid, dst_dpid)

        return None
fwd_util.py 文件源码 项目:SDN-IP-Ryu 作者: paradisecr 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_shortest_path(self, nx_graph, src_dpid, dst_dpid):

        if nx.has_path(nx_graph, src_dpid, dst_dpid):
            return nx.shortest_path(nx_graph, src_dpid, dst_dpid)

        return None
fwd_util.py 文件源码 项目:SDN-IP-Ryu 作者: paradisecr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_shortest_path(self, nx_graph, src_dpid, dst_dpid):

        if nx.has_path(nx_graph, src_dpid, dst_dpid):
            return nx.shortest_path(nx_graph, src_dpid, dst_dpid)

        return None
color_conversions.py 文件源码 项目:MagicWand 作者: GianlucaSilvestri 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_conversion_path(self, start_type, target_type):
        start_type = self._normalise_type(start_type)
        target_type = self._normalise_type(target_type)
        try:
            # Retrieve node sequence that leads from start_type to target_type.
            path = networkx.shortest_path(self.conversion_graph, start_type, target_type)
            # Look up edges between nodes and retrieve the conversion function for each edge.
            return [self.conversion_graph.get_edge_data(node_a, node_b)['conversion_function'] for node_a, node_b in
                    zip(path[:-1], path[1:])]
        except networkx.NetworkXNoPath:
            raise UndefinedConversionError(
                start_type,
                target_type,
            )
pf.py 文件源码 项目:PyPSA 作者: PyPSA 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def find_tree(sub_network, weight='x_pu'):
    """Get the spanning tree of the graph, choose the node with the
    highest degree as a central "tree slack" and then see for each
    branch which paths from the slack to each node go through the
    branch.

    """

    branches_bus0 = sub_network.branches()["bus0"]
    branches_i = branches_bus0.index
    buses_i = sub_network.buses_i()

    graph = sub_network.graph(weight=weight)
    sub_network.tree = nx.minimum_spanning_tree(graph)

    #find bus with highest degree to use as slack
    tree_slack_bus, slack_degree = max(degree(sub_network.tree), key=itemgetter(1))
    logger.info("Tree slack bus is %s with degree %d.", tree_slack_bus, slack_degree)

    #determine which buses are supplied in tree through branch from slack

    #matrix to store tree structure
    sub_network.T = dok_matrix((len(branches_i),len(buses_i)))

    for j,bus in enumerate(buses_i):
        path = nx.shortest_path(sub_network.tree,bus,tree_slack_bus)
        for i in range(len(path)-1):
            branch = next(iterkeys(graph[path[i]][path[i+1]]))
            branch_i = branches_i.get_loc(branch)
            sign = +1 if branches_bus0.iat[branch_i] == path[i] else -1
            sub_network.T[branch_i,j] = sign
compute_topological_features.py 文件源码 项目:tweegraph 作者: PGryllos 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_shortest_distance(pair_id):
    global graph
    id_1 = int(pair_id.split('_')[0])
    id_2 = int(pair_id.split('_')[1])
    try:
        return len(nx.shortest_path(graph, source=id_1, target=id_2)) - 1
    except:
        return -1
whitening_util.py 文件源码 项目:stuff 作者: yaroslavvb 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def timeit_summarize():
  global global_timeit_dict
  pass

# graph traversal
# computation flows from parents to children
# to find path from target to dependency, do
# nx.shortest_path(gg, dependency, target)
whitening_util.py 文件源码 项目:stuff 作者: yaroslavvb 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def shortest_path(dep, target):
  if hasattr(dep, "op"):
    dep = dep.op
  if hasattr(target, "op"):
    target = target.op
  return nx.shortest_path(nx_graph(), dep, target)
util.py 文件源码 项目:stuff 作者: yaroslavvb 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def timeit_summarize():
  global global_timeit_dict
  pass

# graph traversal
# computation flows from parents to children
# to find path from target to dependency, do
# nx.shortest_path(gg, dependency, target)
util.py 文件源码 项目:stuff 作者: yaroslavvb 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def shortest_path(dep, target):
  if hasattr(dep, "op"):
    dep = dep.op
  if hasattr(target, "op"):
    target = target.op
  return nx.shortest_path(nx_graph(), dep, target)
util.py 文件源码 项目:stuff 作者: yaroslavvb 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def timeit_summarize():
  global global_timeit_dict
  pass

# graph traversal
# computation flows from parents to children
# to find path from target to dependency, do
# nx.shortest_path(gg, dependency, target)
util.py 文件源码 项目:stuff 作者: yaroslavvb 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def timeit_summarize():
  global global_timeit_dict
  pass

# graph traversal
# computation flows from parents to children
# to find path from target to dependency, do
# nx.shortest_path(gg, dependency, target)
util.py 文件源码 项目:stuff 作者: yaroslavvb 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def shortest_path(dep, target):
  if hasattr(dep, "op"):
    dep = dep.op
  if hasattr(target, "op"):
    target = target.op
  return nx.shortest_path(nx_graph(), dep, target)
util.py 文件源码 项目:stuff 作者: yaroslavvb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def timeit_summarize():
  global global_timeit_dict
  pass

# graph traversal
# computation flows from parents to children
# to find path from target to dependency, do
# nx.shortest_path(gg, dependency, target)
util.py 文件源码 项目:stuff 作者: yaroslavvb 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def shortest_path(dep, target):
  if hasattr(dep, "op"):
    dep = dep.op
  if hasattr(target, "op"):
    target = target.op
  return nx.shortest_path(nx_graph(), dep, target)
inverse_segfault.py 文件源码 项目:stuff 作者: yaroslavvb 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def timeit_summarize():
  global global_timeit_dict
  pass

# graph traversal
# computation flows from parents to children
# to find path from target to dependency, do
# nx.shortest_path(gg, dependency, target)
maze.py 文件源码 项目:RasPiBot202 作者: DrGFreeman 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def getNextNodeInShortestPath(self, currentNode, goalNode):
        path = nx.shortest_path(self.g, currentNode, goalNode, weight = 'weight')
        if len(path) ==1:
            return path[0]
        else:
            return path[1]

    # Finds the next node in the path to the nearest node with unvisited paths
maze.py 文件源码 项目:RasPiBot202 作者: DrGFreeman 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getNextNodeToNearestUnvisited(self, currentNode):
        nearestUnvisited = self.getNearestUnvisited(currentNode)
        path = nx.shortest_path(self.g, currentNode, nearestUnvisited, weight = 'weight')
        if len(path) == 1:
            print "Next node with unvisited paths: ", path[0].uid, " (current node)"
            return path[0]
        else:
            print "Next node with unvisited paths: ", path[1].uid
            return path[1]
transforms.py 文件源码 项目:pyhiro 作者: wanweiwei07 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add_edge(self, u, v, *args, **kwargs):
        changed = False
        if u == v:
            if self.flags['strict']:
                raise ValueError('Edge must be between two unique nodes!')
            return changed
        if self._undirected.has_edge(u, v):
            self.remove_edges_from([[u, v], [v,u]])
        elif len(self.nodes()) > 0:
            try: 
                path = nx.shortest_path(self._undirected, u, v)
                if self.flags['strict']:
                    raise ValueError('Multiple edge path exists between nodes!')
                self.disconnect_path(path)
                changed = True
            except (nx.NetworkXError, nx.NetworkXNoPath):
                pass
        self._undirected.add_edge(u,v)
        super(self.__class__, self).add_edge(u, v, *args, **kwargs)

        if self.flags['assert_forest']:
            # this is quite slow but makes very sure structure is correct 
            # so is mainly used for testing
            assert nx.is_forest(nx.Graph(self))

        return changed


问题


面经


文章

微信
公众号

扫码关注公众号