def visualize(self, edgelabel='control', current_node=None,
draw='pygraphviz'):
"""
Visualizes a LOMAP system model.
"""
assert edgelabel is None or nx.is_weighted(self.g, weight=edgelabel)
if draw == 'pygraphviz':
nx.view_pygraphviz(self.g, edgelabel)
elif draw == 'matplotlib':
pos = nx.get_node_attributes(self.g, 'location')
if len(pos) != self.g.number_of_nodes():
pos = nx.spring_layout(self.g)
if current_node is None:
colors = 'r'
else:
if current_node == 'init':
current_node = next(self.init.iterkeys())
colors = dict([(v, 'r') for v in self.g])
colors[current_node] = 'b'
colors = colors.values()
nx.draw(self.g, pos=pos, node_color=colors)
nx.draw_networkx_labels(self.g, pos=pos)
edge_labels = nx.get_edge_attributes(self.g, edgelabel)
nx.draw_networkx_edge_labels(self.g, pos=pos,
edge_labels=edge_labels)
else:
raise ValueError('Expected parameter draw to be either:'
+ '"pygraphviz" or "matplotlib"!')
python类get_node_attributes()的实例源码
def scale_node_coordinates(self, scale):
"""
Scales node coordinates, using 1:scale. Scale should be in meters.
Parameters
-----------
scale : float
Coordinate scale multiplier.
"""
pos = nx.get_node_attributes(self._graph, 'pos')
for name, node in self._nodes.items():
self.set_node_coordinates(name, (pos[name][0]*scale, pos[name][1]*scale))
def distance_to_epicenter(self, wn, element_type=wntr.network.Node):
"""
Distance to the epicenter
Parameters
-----------
wn : WaterNetworkModel
element_type: optional (default = wntr.network.Node)
Returns
---------
R : pd.Series
Distance to epicenter (m)
"""
G = wn.get_graph_deep_copy()
pos = nx.get_node_attributes(G,'pos')
R = pd.Series()
if element_type in [wntr.network.Link, wntr.network.Pipe, wntr.network.Pump, wntr.network.Valve]:
# Compute pipe center position
link_pos = {}
for name, link in wn.links(element_type):
start_point = pos[link.start_node]
end_point = pos[link.end_node]
link_pos[name] = ((end_point[0] + start_point[0])/2,
(end_point[1] + start_point[1])/2)
for name, link in wn.links(element_type):
R[name] = distance.euclidean(self.epicenter, link_pos[name]) # m
elif element_type in [wntr.network.Node, wntr.network.Junction, wntr.network.Tank, wntr.network.Reservoir]:
for name, node in wn.nodes(element_type):
R[name] = distance.euclidean(self.epicenter, pos[name]) # m
return R
def _write_coordinates(self, f, wn):
f.write('[COORDINATES]\n'.encode('ascii'))
entry = '{:10s} {:f} {:f}\n'
label = '{:10s} {:10s} {:10s}\n'
f.write(label.format(';Node', 'X-Coord', 'Y-Coord').encode('ascii'))
coord = nx.get_node_attributes(wn._graph, 'pos')
keys = list(coord.keys())
keys.sort()
for key in keys:
val = coord[key]
f.write(entry.format(key, val[0], val[1]).encode('ascii'))
f.write('\n'.encode('ascii'))
def _node_product(G, H, label_threshold=None):
for u, v in product(G, H):
attrs = _dict_product(G.node[u], H.node[v])
if label_threshold is not None:
G_labels = nx.get_node_attributes(G, 'label')
H_labels = nx.get_node_attributes(H, 'label')
if abs(G_labels[u] - H_labels[v]) < label_threshold:
new_label = G_labels[u] * H_labels[v]
attrs['label'] = new_label
yield ((u, v), attrs)
else:
yield ((u, v), attrs)
def _node_product(G, H, label_threshold=None):
for u, v in product(G, H):
attrs = _dict_product(G.node[u], H.node[v])
if label_threshold is not None:
G_labels = nx.get_node_attributes(G, 'label')
H_labels = nx.get_node_attributes(H, 'label')
if abs(G_labels[u] - H_labels[v]) < label_threshold:
new_label = G_labels[u] * H_labels[v]
attrs['label'] = new_label
yield ((u, v), attrs)
else:
yield ((u, v), attrs)
def get(w, d) -> object:
wikipedia = nx.DiGraph(w)
wikidata = nx.DiGraph(d)
root_nodes = nx.get_node_attributes(wikipedia, 'group')
wikidata_root_nodes = nx.get_node_attributes(wikidata, 'group')
assert (len(root_nodes) == len(wikidata_root_nodes)), 'Error: Graph root size should be the same!'
url_wiki = nx.get_node_attributes(wikipedia, 'url')
url_data = nx.get_node_attributes(wikidata, 'url')
revision_id_wikipedia = nx.get_node_attributes(wikipedia, 'revision_id_wikipedia')
revision_id_wikidata = nx.get_node_attributes(wikidata, 'revision_id_wikidata')
city_list = []
for c in root_nodes.keys():
wg_neighbors = wikipedia.successors(c)
wd_neighbors = wikidata.successors(c)
pedia = set(wg_neighbors)
data = set(wd_neighbors)
intersection = set(pedia).intersection(data)
wikipedia_missing = set(data) - set(pedia)
wikidata_missing = set(pedia) - set(data)
city_dict = {'qid': c,
'revision_id_wikipedia': revision_id_wikipedia[c],
'revision_id_wikidata': revision_id_wikidata[c],
'url': url_wiki[c],
'miss_wikipedia': city_build(url_data, wikipedia_missing),
'intersection': city_build(url_wiki, intersection),
'data_cities': city_build(url_wiki, wikidata_missing)
}
city_list.append(city_dict)
city_list = sorted(city_list, key=lambda x: x['url'])
return city_list
def apply_to_graph(fun, G = None):
"""
Applies given algorithm to random geometric graph and displays the results side by side
:param fun: A function which has the signature f(G) and returns iterator of edges of graph G
:param G: a networkx Graph. If None, random geometric graph is created and applied
:return: Plot showing G and fun(G)
"""
if G is None:
G = nx.random_geometric_graph(100, .125)
# position is stored as node attribute data for random_geometric_graph
pos = nx.get_node_attributes(G, 'pos')
nodesize = 80
for u, v in G.edges():
G.edge[u][v]['weight'] = ((G.node[v]['pos'][0] - G.node[u]['pos'][0]) ** 2 +
(G.node[v]['pos'][1] - G.node[u]['pos'][1]) ** 2) ** .5
else:
pos = graphviz_layout(G)
nodesize = 200
# find node near center (0.5,0.5)
color = {}
dmin = 1
ncenter = 0
for n in pos:
x, y = pos[n]
d = (x - 0.5) ** 2 + (y - 0.5) ** 2
color[n] = d
if d < dmin:
ncenter = n
dmin = d
res = nx.Graph(list(fun(G)))
plt.figure(figsize=(10, 8))
plt.suptitle(fun.__name__ + " algorithm application")
plt.subplot(1, 2, 1)
plt.title("Original Graph G")
nx.draw_networkx_edges(G, pos, nodelist=[ncenter], alpha=0.4)
nx.draw_networkx_nodes(G, pos,
nodelist=color.keys(),
node_size=nodesize,
node_color=list(color.values()),
cmap=plt.get_cmap("Reds_r")
).set_edgecolor('k')
if G is not None:
nx.draw_networkx_labels(G,pos)
nx.draw_networkx_edge_labels(G,pos,
edge_labels=nx.get_edge_attributes(G,'weight'))
plt.axis('off')
plt.subplot(1, 2, 2)
plt.title("Resultant Graph, R = {0}(G)".format(fun.__name__))
nx.draw_networkx_edges(res, pos, nodelist=[ncenter], alpha=0.4)
nx.draw_networkx_nodes(res, pos,
node_color=list(color[n] for n in res.nodes()),
node_size=nodesize,
cmap=plt.get_cmap("Greens_r")).set_edgecolor('k')
if G is not None:
nx.draw_networkx_labels(res,pos)
nx.draw_networkx_edge_labels(res, pos,
edge_labels=nx.get_edge_attributes(res, 'weight'))
plt.axis('off')
plt.show()
graph.py 文件源码
项目:uai2017_learning_to_acquire_information
作者: evanthebouncy
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def get_dependency_coef(ge, link):
broken_graph = copy.deepcopy(ge)
iii, jjj = link
broken_graph[iii][jjj] = 0
broken_graph[jjj][iii] = 0
comps = get_ccomp(broken_graph)
halfs = dict()
for blah in comps:
if comps[blah] not in halfs:
halfs[comps[blah]] = 1
else:
halfs[comps[blah]] += 1
M = float(halfs[0])
N = float(halfs[1])
return (M * N) / ( (M + N) * (M + N - 1) )
# V = gen_pts(N)
# # G = gen_graph(V, 6)
# G = gen_spanning_tree(V)
#
# G_V = V
# G_E = G
#
# print G_V
# print G_E
#
# Gr = nx.Graph()
# for i in range(N):
# Gr.add_node(i, pos=G_V[i])
#
# for i in range(N):
# for j in range(N):
# if G_E[i][j]:
# Gr.add_edge(i,j)
#
# labels = dict()
# for i in range(N):
# labels[i] = str(i)
#
# pos=nx.get_node_attributes(Gr,'pos')
#
# nx.draw(Gr, pos=pos,
# node_size=250, with_labels=False)
# nx.draw_networkx_labels(Gr, pos, labels)
#
# plt.savefig("graph.png")
def _create_viz_dag(comp_dag, colors='state', cmap=None):
colors = colors.lower()
if colors == 'state':
if cmap is None:
cmap = _state_colors
elif colors == 'timing':
if cmap is None:
cmap = mpl.colors.LinearSegmentedColormap.from_list('blend', ['#15b01a', '#ffff14', '#e50000'])
timings = nx.get_node_attributes(comp_dag, _AN_TIMING)
max_duration = max(timing.duration for timing in six.itervalues(timings) if hasattr(timing, 'duration'))
min_duration = min(timing.duration for timing in six.itervalues(timings) if hasattr(timing, 'duration'))
else:
raise ValueError('{} is not a valid loman colors parameter for visualization'.format(colors))
viz_dag = nx.DiGraph()
node_index_map = {}
for i, (name, data) in enumerate(comp_dag.nodes(data=True)):
short_name = "n{}".format(i)
attr_dict = {
'label': name,
'style': 'filled',
'_group': data.get(_AN_GROUP)
}
if colors == 'state':
attr_dict['fillcolor'] = cmap[data.get(_AN_STATE, None)]
elif colors == 'timing':
timing_data = data.get(_AN_TIMING)
if timing_data is None:
col = '#FFFFFF'
else:
duration = timing_data.duration
norm_duration = (duration - min_duration) / (max_duration - min_duration)
col = mpl.colors.rgb2hex(cmap(norm_duration))
attr_dict['fillcolor'] = col
viz_dag.add_node(short_name, **attr_dict)
node_index_map[name] = short_name
for name1, name2 in comp_dag.edges():
short_name_1 = node_index_map[name1]
short_name_2 = node_index_map[name2]
group1 = comp_dag.node[name1].get(_AN_GROUP)
group2 = comp_dag.node[name2].get(_AN_GROUP)
group = group1 if group1 == group2 else None
attr_dict = {'_group': group}
viz_dag.add_edge(short_name_1, short_name_2, **attr_dict)
return viz_dag