python类MultiDiGraph()的实例源码

compute_edge_parameters.py 文件源码 项目:roadmap-processing 作者: aicenter 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def graph_multi_test(graph: nx.MultiDiGraph):
    for edge in graph.edges():
        if len(graph[edge[0]][edge[1]]) > 1:
            a=1
simplify_graph.py 文件源码 项目:roadmap-processing 作者: aicenter 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _load_graph(json_dict: dict) -> nx.MultiDiGraph:
    g = nx.MultiDiGraph()
    for item in json_dict['features']:
        coord = item['geometry']['coordinates']
        coord_u = get_node(coord[0])
        coord_v = get_node(coord[-1])
        if coord_u != coord_v or len(coord) != 2:  # prune loops without any purpose, save loops like traffic roundabout
            lanes = item['properties']['lanes']
            g.add_edge(coord_u, coord_v, id=item['properties']['id'], others=[[]], lanes=lanes)
    return g
simplify_graph.py 文件源码 项目:roadmap-processing 作者: aicenter 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def simplify_graph(g: nx.MultiDiGraph, check_lanes):
    for n, _ in list(g.adjacency()):
        if g.out_degree(n) == 1 and g.in_degree(n) == 1:  # oneways
            simplify_oneways(n, g, check_lanes)

    for n, _ in list(g.adjacency()):
        if g.out_degree(n) == 2 and g.in_degree(n) == 2:  # both directions in highway
            simplify_twoways(n, g, check_lanes)
prepare_geojson_to_agentpolisdemo.py 文件源码 项目:roadmap-processing 作者: aicenter 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def load_graph(json_dict):
    g = nx.MultiDiGraph()
    for item in json_dict['features']:
        coord = item['geometry']['coordinates']
        coord_u = get_node(coord[0])
        coord_v = get_node(coord[-1])
        if coord_u != coord_v or len(coord) != 2:  # prune loops without any purpose, save loops like traffic roundabout
            lanes = item['properties']['lanes']
            data = item['geometry']['coordinates'][1:-1]
            if len(data) == 0:
                data = []
            g.add_edge(coord_u, coord_v, id=item['properties']['id'], others=data, lanes=lanes)
    return g
io.py 文件源码 项目:roadmap-processing 作者: aicenter 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load_graph(data: geojson.feature.FeatureCollection) -> nx.MultiDiGraph:
    g = nx.MultiDiGraph()
    print_info("Creating networkx graph from geojson")
    for item in tqdm(data['features'], desc="processing features"):
        coord = item['geometry']['coordinates']
        coord_u = _get_node(coord[0])
        coord_v = _get_node(coord[-1])
        g.add_edge(coord_u, coord_v, id=item['properties']['id'])
    return g
export_nodes_and_id_maker.py 文件源码 项目:roadmap-processing 作者: aicenter 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load_graph(json_dict):
    g = nx.MultiDiGraph()
    for item in json_dict['features']:
        coord = item['geometry']['coordinates']
        coord_u = get_node(coord[0])
        coord_v = get_node(coord[-1])
        g.add_edge(coord_u, coord_v, id=item['properties']['id'])
    return g
workflow.py 文件源码 项目:juicer 作者: eubr-bigsea 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def __init__(self, workflow_data, config):

        self.config = config
        # Initialize
        self.graph = nx.MultiDiGraph()

        # Workflow dictionary
        self.workflow = workflow_data

        # Construct graph
        self._build_initial_workflow_graph()
        self._build_privacy_restrictions()

        # Topological sorted tasks according to their dependencies
        self.sorted_tasks = []

        # Spark or COMPSs
        self.platform = workflow_data.get('platform', {}).get('slug', 'spark')

        # Verify null edges to topological_sorted_tasks
        if self.is_there_null_target_id_tasks() \
                and self.is_there_null_source_id_tasks():
            self.sorted_tasks = self.get_topological_sorted_tasks()
        else:
            raise AttributeError(
                _("Port '{}/{}' must be informed for operation{}").format(
                    self.WORKFLOW_GRAPH_SOURCE_ID_PARAM,
                    self.WORKFLOW_GRAPH_TARGET_ID_PARAM,
                    self.__class__))
graph.py 文件源码 项目:cobol-sharp 作者: petli 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, debug=False):
        self.graph = nx.MultiDiGraph()
        self._debug = debug
network_graph.py 文件源码 项目:NetPower_TestBed 作者: Vignesh2208 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_mdg(self):

        mdg = nx.MultiDiGraph(self.graph)

        for n in self.graph:
            node_type = self.get_node_type(n)

            # Remove all host nodes
            if node_type == "host":
                mdg.remove_node(n)

        return mdg
dfa.py 文件源码 项目:revex 作者: lucaswiman 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def as_multidigraph(self):  # type: () -> nx.MultiDiGraph
        """
        Constructs a MultiDiGraph that is a copy of self.

        This is a bit of a hack, but allows some useful methods like .subgraph()
        to work correctly.
        """
        graph = nx.MultiDiGraph()
        graph.add_nodes_from(self.nodes(data=True))
        graph.add_edges_from(self.edges(data=True))
        return graph
graph_networkx.py 文件源码 项目:Android-Repackaged-App-Detection-System 作者: M157q 项目源码 文件源码 阅读 59 收藏 0 点赞 0 评论 0
def parse_saaf_json(filename):
    with open(filename) as json_file:
        saaf_json = json.load(json_file)
    #print saaf_json

    G = nx.MultiDiGraph()
    #graphs = []

    i = 0
    for invoke, invoke_info in saaf_json.iteritems():
        for backtrack in invoke_info:
            j = 0
            for node in backtrack:
                node = dict((k.lower(), v) for k, v in node.iteritems())
                node['parent']  = node['parent'] + i
                node['nodeid']  = node['nodeid'] + i
                node['invoke']  = invoke

                new_node = node['nodeid']
                new_edge = (node['parent'], node['nodeid'])

                if not G.has_node(new_node):
                    G.add_node(new_node, attr_dict = node)

                if node['parent'] == i-1:
                    node['parent'] = i
                elif not G.has_edge(*new_edge):
                    G.add_edge(*new_edge)

                j += 1
            i += j

            #print i
            #print G.edges()
            #print json.dumps(json_graph.node_link_data(G), sort_keys = True, indent = 4)
            #print
        #graphs.append(G)

    #return graphs
    return G
save_load.py 文件源码 项目:osmnx 作者: gboeing 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def gdfs_to_graph(gdf_nodes, gdf_edges):
    """
    Convert node and edge GeoDataFrames into a graph

    Parameters
    ----------
    gdf_nodes : GeoDataFrame
    gdf_edges : GeoDataFrame

    Returns
    -------
    networkx multidigraph
    """

    G = nx.MultiDiGraph()
    G.graph['crs'] = gdf_nodes.crs
    G.graph['name'] = gdf_nodes.gdf_name.rstrip('_nodes')

    # add the nodes and their attributes to the graph
    G.add_nodes_from(gdf_nodes.index)
    attributes = gdf_nodes.to_dict()
    for attribute_name in gdf_nodes.columns:
        # only add this attribute to nodes which have a non-null value for it
        attribute_values = {k:v for k, v in attributes[attribute_name].items() if pd.notnull(v)}
        nx.set_node_attributes(G, name=attribute_name, values=attribute_values)

    # add the edges and attributes that are not u, v, key (as they're added
    # separately) or null
    for _, row in gdf_edges.iterrows():
        attrs = {}
        for label, value in row.iteritems():
            if (label not in ['u', 'v', 'key']) and (isinstance(value, list) or pd.notnull(value)):
                attrs[label] = value
        G.add_edge(u=row['u'], v=row['v'], key=row['key'], **attrs)

    return G
parsemis_wrapper.py 文件源码 项目:parsemis_wrapper 作者: tomkdickinson 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_label_from_edge(g, edge, attribute_name='label'):
        edge_attributes = g.get_edge_data(edge[0], edge[1])
        if edge_attributes is None and nx.is_directed(g):
            edge_attributes = g.get_edge_data(edge[1], edge[0])

        labels = []
        if type(g) == nx.MultiDiGraph or type(g) == nx.MultiGraph:
            for index in edge_attributes:
                if attribute_name in edge_attributes[index]:
                    labels.append(edge_attributes[index][attribute_name])
        else:
            if attribute_name in edge_attributes:
                labels.append(edge_attributes[attribute_name])

        return labels
parsemis_wrapper_test.py 文件源码 项目:parsemis_wrapper 作者: tomkdickinson 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_multidigraph_is_subgraph_with_labels(self):
        G = nx.MultiDiGraph()
        G.add_edges_from(
            [
                (1, 2, {'label': 'a'}),
                (1, 2, {'label': 'f'}),
                (2, 3, {'label': 'c'}),
                (3, 4, {'label': 'd'}),
                (2, 4, {'label': 'b'})
            ]
        )

        valid_subgraph_a = nx.MultiDiGraph()
        valid_subgraph_a.add_edge(1, 2, label='a')

        valid_subgraph_b = nx.MultiDiGraph()
        valid_subgraph_b.add_edge(1, 2, label='f')

        valid_subgraph_c = nx.MultiDiGraph()
        valid_subgraph_c.add_edge(3, 4, label='d')

        invalid_subgraph_a = nx.MultiDiGraph()
        invalid_subgraph_a.add_edge(1, 2, label='b')

        invalid_subgraph_b = nx.MultiDiGraph()
        invalid_subgraph_b.add_edge(2, 1, label='a')

        invalid_subgraph_c = nx.MultiDiGraph()
        invalid_subgraph_c.add_edge(1, 4, label='a')

        self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_a))
        self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_b))
        self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_c))

        self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_a))
        self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_b))
        self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_c))
parsemis_wrapper_test.py 文件源码 项目:parsemis_wrapper 作者: tomkdickinson 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_frequent_graph(self):
        G = nx.MultiDiGraph()
        G.add_edge(1, 2, label='a')
        G.add_edge(1, 2, label='b')

        fg = FrequentGraph(G, [])
        print("Name: %s" % fg.to_string())
chess_masters.py 文件源码 项目:analyse_website_dns 作者: mrcheng0910 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def chess_pgn_graph(pgn_file="chess_masters_WCC.pgn.bz2"):
    """Read chess games in pgn format in pgn_file.

    Filenames ending in .gz or .bz2 will be uncompressed.

    Return the MultiDiGraph of players connected by a chess game.
    Edges contain game data in a dict.

    """
    import bz2
    G=nx.MultiDiGraph()
    game={}
    datafile = bz2.BZ2File(pgn_file)
    lines = (line.decode().rstrip('\r\n') for line in datafile)
    for line in lines:
        if line.startswith('['):
            tag,value=line[1:-1].split(' ',1)
            game[str(tag)]=value.strip('"')
        else:
        # empty line after tag set indicates
        # we finished reading game info
            if game:
                white=game.pop('White')
                black=game.pop('Black')
                G.add_edge(white, black, **game)
                game={}
    return G
chess_masters.py 文件源码 项目:analyse_website_dns 作者: mrcheng0910 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def chess_pgn_graph(pgn_file="chess_masters_WCC.pgn.bz2"):
    """Read chess games in pgn format in pgn_file.

    Filenames ending in .gz or .bz2 will be uncompressed.

    Return the MultiDiGraph of players connected by a chess game.
    Edges contain game data in a dict.

    """
    import bz2
    G=nx.MultiDiGraph()
    game={}
    datafile = bz2.BZ2File(pgn_file)
    lines = (line.decode().rstrip('\r\n') for line in datafile)
    for line in lines:
        if line.startswith('['):
            tag,value=line[1:-1].split(' ',1)
            game[str(tag)]=value.strip('"')
        else:
        # empty line after tag set indicates
        # we finished reading game info
            if game:
                white=game.pop('White')
                black=game.pop('Black')
                G.add_edge(white, black, **game)
                game={}
    return G
test_exports.py 文件源码 项目:gtfspy 作者: CxAalto 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_combined_stop_to_stop_transit_network(self):
        multi_di_graph = networks.combined_stop_to_stop_transit_network(self.gtfs)
        self.assertIsInstance(multi_di_graph, networkx.MultiDiGraph)
        for from_node, to_node, data in multi_di_graph.edges(data=True):
            self.assertIn("route_type", data)
test.py 文件源码 项目:python_cypher 作者: zacernst 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_upper(self):
        """Test we can parse a CREATE... RETURN query."""
        g = nx.MultiDiGraph()
        query = 'CREATE (n:SOMECLASS) RETURN n'
        test_parser = python_cypher.CypherToNetworkx()
        test_parser.query(g, query)
test.py 文件源码 项目:python_cypher 作者: zacernst 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_create_node(self):
        """Test we can build a query and create a node"""
        g = nx.MultiDiGraph()
        query = 'CREATE (n) RETURN n'
        test_parser = python_cypher.CypherToNetworkx()
        for i in test_parser.query(g, query):
            pass
        self.assertEqual(len(g.node), 1)


问题


面经


文章

微信
公众号

扫码关注公众号