def graph_multi_test(graph: nx.MultiDiGraph):
for edge in graph.edges():
if len(graph[edge[0]][edge[1]]) > 1:
a=1
python类MultiDiGraph()的实例源码
def _load_graph(json_dict: dict) -> nx.MultiDiGraph:
g = nx.MultiDiGraph()
for item in json_dict['features']:
coord = item['geometry']['coordinates']
coord_u = get_node(coord[0])
coord_v = get_node(coord[-1])
if coord_u != coord_v or len(coord) != 2: # prune loops without any purpose, save loops like traffic roundabout
lanes = item['properties']['lanes']
g.add_edge(coord_u, coord_v, id=item['properties']['id'], others=[[]], lanes=lanes)
return g
def simplify_graph(g: nx.MultiDiGraph, check_lanes):
for n, _ in list(g.adjacency()):
if g.out_degree(n) == 1 and g.in_degree(n) == 1: # oneways
simplify_oneways(n, g, check_lanes)
for n, _ in list(g.adjacency()):
if g.out_degree(n) == 2 and g.in_degree(n) == 2: # both directions in highway
simplify_twoways(n, g, check_lanes)
prepare_geojson_to_agentpolisdemo.py 文件源码
项目:roadmap-processing
作者: aicenter
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def load_graph(json_dict):
g = nx.MultiDiGraph()
for item in json_dict['features']:
coord = item['geometry']['coordinates']
coord_u = get_node(coord[0])
coord_v = get_node(coord[-1])
if coord_u != coord_v or len(coord) != 2: # prune loops without any purpose, save loops like traffic roundabout
lanes = item['properties']['lanes']
data = item['geometry']['coordinates'][1:-1]
if len(data) == 0:
data = []
g.add_edge(coord_u, coord_v, id=item['properties']['id'], others=data, lanes=lanes)
return g
def load_graph(data: geojson.feature.FeatureCollection) -> nx.MultiDiGraph:
g = nx.MultiDiGraph()
print_info("Creating networkx graph from geojson")
for item in tqdm(data['features'], desc="processing features"):
coord = item['geometry']['coordinates']
coord_u = _get_node(coord[0])
coord_v = _get_node(coord[-1])
g.add_edge(coord_u, coord_v, id=item['properties']['id'])
return g
def load_graph(json_dict):
g = nx.MultiDiGraph()
for item in json_dict['features']:
coord = item['geometry']['coordinates']
coord_u = get_node(coord[0])
coord_v = get_node(coord[-1])
g.add_edge(coord_u, coord_v, id=item['properties']['id'])
return g
def __init__(self, workflow_data, config):
self.config = config
# Initialize
self.graph = nx.MultiDiGraph()
# Workflow dictionary
self.workflow = workflow_data
# Construct graph
self._build_initial_workflow_graph()
self._build_privacy_restrictions()
# Topological sorted tasks according to their dependencies
self.sorted_tasks = []
# Spark or COMPSs
self.platform = workflow_data.get('platform', {}).get('slug', 'spark')
# Verify null edges to topological_sorted_tasks
if self.is_there_null_target_id_tasks() \
and self.is_there_null_source_id_tasks():
self.sorted_tasks = self.get_topological_sorted_tasks()
else:
raise AttributeError(
_("Port '{}/{}' must be informed for operation{}").format(
self.WORKFLOW_GRAPH_SOURCE_ID_PARAM,
self.WORKFLOW_GRAPH_TARGET_ID_PARAM,
self.__class__))
def __init__(self, debug=False):
self.graph = nx.MultiDiGraph()
self._debug = debug
def get_mdg(self):
mdg = nx.MultiDiGraph(self.graph)
for n in self.graph:
node_type = self.get_node_type(n)
# Remove all host nodes
if node_type == "host":
mdg.remove_node(n)
return mdg
def as_multidigraph(self): # type: () -> nx.MultiDiGraph
"""
Constructs a MultiDiGraph that is a copy of self.
This is a bit of a hack, but allows some useful methods like .subgraph()
to work correctly.
"""
graph = nx.MultiDiGraph()
graph.add_nodes_from(self.nodes(data=True))
graph.add_edges_from(self.edges(data=True))
return graph
graph_networkx.py 文件源码
项目:Android-Repackaged-App-Detection-System
作者: M157q
项目源码
文件源码
阅读 59
收藏 0
点赞 0
评论 0
def parse_saaf_json(filename):
with open(filename) as json_file:
saaf_json = json.load(json_file)
#print saaf_json
G = nx.MultiDiGraph()
#graphs = []
i = 0
for invoke, invoke_info in saaf_json.iteritems():
for backtrack in invoke_info:
j = 0
for node in backtrack:
node = dict((k.lower(), v) for k, v in node.iteritems())
node['parent'] = node['parent'] + i
node['nodeid'] = node['nodeid'] + i
node['invoke'] = invoke
new_node = node['nodeid']
new_edge = (node['parent'], node['nodeid'])
if not G.has_node(new_node):
G.add_node(new_node, attr_dict = node)
if node['parent'] == i-1:
node['parent'] = i
elif not G.has_edge(*new_edge):
G.add_edge(*new_edge)
j += 1
i += j
#print i
#print G.edges()
#print json.dumps(json_graph.node_link_data(G), sort_keys = True, indent = 4)
#print
#graphs.append(G)
#return graphs
return G
def gdfs_to_graph(gdf_nodes, gdf_edges):
"""
Convert node and edge GeoDataFrames into a graph
Parameters
----------
gdf_nodes : GeoDataFrame
gdf_edges : GeoDataFrame
Returns
-------
networkx multidigraph
"""
G = nx.MultiDiGraph()
G.graph['crs'] = gdf_nodes.crs
G.graph['name'] = gdf_nodes.gdf_name.rstrip('_nodes')
# add the nodes and their attributes to the graph
G.add_nodes_from(gdf_nodes.index)
attributes = gdf_nodes.to_dict()
for attribute_name in gdf_nodes.columns:
# only add this attribute to nodes which have a non-null value for it
attribute_values = {k:v for k, v in attributes[attribute_name].items() if pd.notnull(v)}
nx.set_node_attributes(G, name=attribute_name, values=attribute_values)
# add the edges and attributes that are not u, v, key (as they're added
# separately) or null
for _, row in gdf_edges.iterrows():
attrs = {}
for label, value in row.iteritems():
if (label not in ['u', 'v', 'key']) and (isinstance(value, list) or pd.notnull(value)):
attrs[label] = value
G.add_edge(u=row['u'], v=row['v'], key=row['key'], **attrs)
return G
def get_label_from_edge(g, edge, attribute_name='label'):
edge_attributes = g.get_edge_data(edge[0], edge[1])
if edge_attributes is None and nx.is_directed(g):
edge_attributes = g.get_edge_data(edge[1], edge[0])
labels = []
if type(g) == nx.MultiDiGraph or type(g) == nx.MultiGraph:
for index in edge_attributes:
if attribute_name in edge_attributes[index]:
labels.append(edge_attributes[index][attribute_name])
else:
if attribute_name in edge_attributes:
labels.append(edge_attributes[attribute_name])
return labels
def test_multidigraph_is_subgraph_with_labels(self):
G = nx.MultiDiGraph()
G.add_edges_from(
[
(1, 2, {'label': 'a'}),
(1, 2, {'label': 'f'}),
(2, 3, {'label': 'c'}),
(3, 4, {'label': 'd'}),
(2, 4, {'label': 'b'})
]
)
valid_subgraph_a = nx.MultiDiGraph()
valid_subgraph_a.add_edge(1, 2, label='a')
valid_subgraph_b = nx.MultiDiGraph()
valid_subgraph_b.add_edge(1, 2, label='f')
valid_subgraph_c = nx.MultiDiGraph()
valid_subgraph_c.add_edge(3, 4, label='d')
invalid_subgraph_a = nx.MultiDiGraph()
invalid_subgraph_a.add_edge(1, 2, label='b')
invalid_subgraph_b = nx.MultiDiGraph()
invalid_subgraph_b.add_edge(2, 1, label='a')
invalid_subgraph_c = nx.MultiDiGraph()
invalid_subgraph_c.add_edge(1, 4, label='a')
self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_a))
self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_b))
self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_c))
self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_a))
self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_b))
self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_c))
def test_frequent_graph(self):
G = nx.MultiDiGraph()
G.add_edge(1, 2, label='a')
G.add_edge(1, 2, label='b')
fg = FrequentGraph(G, [])
print("Name: %s" % fg.to_string())
def chess_pgn_graph(pgn_file="chess_masters_WCC.pgn.bz2"):
"""Read chess games in pgn format in pgn_file.
Filenames ending in .gz or .bz2 will be uncompressed.
Return the MultiDiGraph of players connected by a chess game.
Edges contain game data in a dict.
"""
import bz2
G=nx.MultiDiGraph()
game={}
datafile = bz2.BZ2File(pgn_file)
lines = (line.decode().rstrip('\r\n') for line in datafile)
for line in lines:
if line.startswith('['):
tag,value=line[1:-1].split(' ',1)
game[str(tag)]=value.strip('"')
else:
# empty line after tag set indicates
# we finished reading game info
if game:
white=game.pop('White')
black=game.pop('Black')
G.add_edge(white, black, **game)
game={}
return G
def chess_pgn_graph(pgn_file="chess_masters_WCC.pgn.bz2"):
"""Read chess games in pgn format in pgn_file.
Filenames ending in .gz or .bz2 will be uncompressed.
Return the MultiDiGraph of players connected by a chess game.
Edges contain game data in a dict.
"""
import bz2
G=nx.MultiDiGraph()
game={}
datafile = bz2.BZ2File(pgn_file)
lines = (line.decode().rstrip('\r\n') for line in datafile)
for line in lines:
if line.startswith('['):
tag,value=line[1:-1].split(' ',1)
game[str(tag)]=value.strip('"')
else:
# empty line after tag set indicates
# we finished reading game info
if game:
white=game.pop('White')
black=game.pop('Black')
G.add_edge(white, black, **game)
game={}
return G
def test_combined_stop_to_stop_transit_network(self):
multi_di_graph = networks.combined_stop_to_stop_transit_network(self.gtfs)
self.assertIsInstance(multi_di_graph, networkx.MultiDiGraph)
for from_node, to_node, data in multi_di_graph.edges(data=True):
self.assertIn("route_type", data)
def test_upper(self):
"""Test we can parse a CREATE... RETURN query."""
g = nx.MultiDiGraph()
query = 'CREATE (n:SOMECLASS) RETURN n'
test_parser = python_cypher.CypherToNetworkx()
test_parser.query(g, query)
def test_create_node(self):
"""Test we can build a query and create a node"""
g = nx.MultiDiGraph()
query = 'CREATE (n) RETURN n'
test_parser = python_cypher.CypherToNetworkx()
for i in test_parser.query(g, query):
pass
self.assertEqual(len(g.node), 1)