def draw_grid(ts, edgelabel='control', prop_colors=None, current_node=None):
assert edgelabel is None or nx.is_weighted(ts.g, weight=edgelabel)
pos = nx.get_node_attributes(ts.g, 'location')
if current_node == 'init':
current_node = next(ts.init.iterkeys())
colors = dict([(v, 'w') for v in ts.g])
if current_node:
colors[current_node] = 'b'
for v, d in ts.g.nodes_iter(data=True):
if d['prop']:
colors[v] = prop_colors[tuple(d['prop'])]
colors = colors.values()
labels = nx.get_node_attributes(ts.g, 'label')
nx.draw(ts.g, pos=pos, node_color=colors)
nx.draw_networkx_labels(ts.g, pos=pos, labels=labels)
edge_labels = nx.get_edge_attributes(ts.g, edgelabel)
nx.draw_networkx_edge_labels(ts.g, pos=pos,
edge_labels=edge_labels)
python类get_edge_attributes()的实例源码
tsp_christofides.py 文件源码
项目:Visualization-of-popular-algorithms-in-Python
作者: MUSoC
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def DrawGraph(G,color):
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels = True, edge_color = color) #with_labels=true is to show the node number in the output graph
edge_labels = nx.get_edge_attributes(G,'length')
nx.draw_networkx_edge_labels(G, pos, edge_labels = edge_labels, font_size = 11) #prints weight on all the edges
return pos
#main function
k_centers_problem.py 文件源码
项目:Visualization-of-popular-algorithms-in-Python
作者: MUSoC
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def DrawGraph(G, centers):
pos = nx.spring_layout(G)
color_map = ['blue'] * len(G.nodes())
#all the center nodes are marked with 'red'
for c in centers:
color_map[c] = 'red'
nx.draw(G, pos, node_color = color_map, with_labels = True) #with_labels=true is to show the node number in the output graph
edge_labels = nx.get_edge_attributes(G, 'length')
nx.draw_networkx_edge_labels(G, pos, edge_labels = edge_labels, font_size = 11) #prints weight on all the edges
#main function
prims.py 文件源码
项目:Visualization-of-popular-algorithms-in-Python
作者: MUSoC
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def DrawGraph(G):
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels = True) #with_labels=true is to show the node number in the output graph
edge_labels = nx.get_edge_attributes(G,'length')
nx.draw_networkx_edge_labels(G, pos, edge_labels = edge_labels, font_size = 11) #prints weight on all the edges
return pos
#main function
kruskals_quick_union.py 文件源码
项目:Visualization-of-popular-algorithms-in-Python
作者: MUSoC
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def DrawGraph(G):
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels = True) # with_labels=true is to show the node number in the output graph
edge_labels = nx.get_edge_attributes(G, 'length')
nx.draw_networkx_edge_labels(G, pos, edge_labels = edge_labels, font_size = 11) # prints weight on all the edges
return pos
# main function
def test_that_network_correctly_computes_edges(self):
def get_price(side):
price = 400 if side == OrderSide.ask else 100
return str(price)
def get_worse_price(s, i=0):
price = float(get_price(s)) + 50 + i if s == OrderSide.ask else float(get_price(s)) - 50 - i
return str(price)
def get_better_price(s, i=0):
price = float(get_price(s)) - 50 - i if s == OrderSide.ask else float(get_price(s)) + 50 + i
return str(price)
product_id = 'BTC-USD'
product = Product(product_id=product_id, quote_currency=Currency.USD, base_currency=Currency.BTC, quote_increment='0.01', base_min_size='0.01')
ob = OrderBook(product)
ob.redis_server.flushdb()
nm = NetworkManager()
for side in OrderSide:
# test adding base set of orders
for idx, price in enumerate([get_price(side), get_better_price(side), get_worse_price(side)]):
order = Order(product_id, 0, side, '1.0', price, order_id=str(idx))
ob + order
order.historical = True
order.order_type = OrderType.match
order.size = '0.5'
ob - order
assert ob.get_price(side, 0)[1] == float(get_better_price(side))
assert ob.get_median_trade_size(side, OrderType.match, 100) == 1.5
assert ob.get_average_trade_size(side, OrderType.match, 100) == 1.5
nm.update_from_order_book(ob, side)
# print(get_edge_attributes(nm.get_network(NetworkType.price, quote_type=QuoteType.product, edge_type=EdgeType.mean), 'weight'))
# print(nm.get_next_nodes_and_avail_qties_by_cycle_value(EdgeType.mean, Currency.USD), {
# 2.3331111259249386: (Currency.BTC, '150.01', '1.5')})
assert nm.get_next_nodes_and_avail_qties_by_cycle_value(EdgeType.mean, Currency.USD) == {
2.3331111259249386: (Currency.BTC, '150.01', '1.5')}
assert nm.get_next_nodes_and_avail_qties_by_cycle_value(EdgeType.mean, Currency.BTC) == {
2.3331111259249386: (Currency.USD, '349.99', '1.5')}
def get_cycles_by_value(self, edge_type: EdgeType, quote_type: QuoteType) -> Dict[float, List[str]]:
dg = self.get_network(NetworkType.price, edge_type, quote_type)
weights = get_edge_attributes(dg, 'weight')
cycle_vals = {}
for cycle in simple_cycles(dg):
# sort the currencies in cycle for long term sanity
best_curr = max(cycle, key=lambda x: Currency[x].value)
best_curr_ind = cycle.index(best_curr)
cycle = [cycle[best_curr_ind]] + cycle[(best_curr_ind + 1):] + cycle[:best_curr_ind]
cycle.append(cycle[0])
prodw = [float(weights[(cycle[i], cycle[i + 1])]) for i in range(len(cycle) - 1)]
prodw = prod(prodw)
cycle_vals[prodw] = cycle
return cycle_vals
def get_weights_order(graph, nodes=atlas_rois):
"returns weights in the order of nodes requested"
# order is not guaranteed below
edge_dict = nx.get_edge_attributes(graph, 'weight')
# so ordering it here, to ensure correspondence across subjects
weights = [ graph[x][y]['weight'] for x in nodes for y in nodes if (x,y) in edge_dict ]
return np.array(weights)
def get_weights_order(graph, nodes=atlas_rois):
"returns weights in the order of nodes requested"
# order is not guaranteed below
edge_dict = nx.get_edge_attributes(graph, 'weight')
# so ordering it here, to ensure correspondence across subjects
weights = [ graph[x][y]['weight'] for x in nodes for y in nodes if (x,y) in edge_dict ]
return np.array(weights)
def get_weights_order(graph, nodes=atlas_rois):
"returns weights in the order of nodes requested"
# order is not guaranteed below
edge_dict = nx.get_edge_attributes(graph, 'weight')
# so ordering it here, to ensure correspondence across subjects
weights = [ graph[x][y]['weight'] for x in nodes for y in nodes if (x,y) in edge_dict ]
return np.array(weights)
def _compute_links(splice_graph):
"""Compute the link lines
L start orientation end orientation overlap
"""
# Edges
edge2overlap = nx.get_edge_attributes(
G=splice_graph,
name="overlap"
)
for (node1, node2) in sorted(splice_graph.edges()):
overlap = edge2overlap[(node1, node2)]
# is an overlap or a gap
if overlap >= 0:
overlap = "{}M".format(overlap)
else:
overlap = "{}G".format(-overlap)
yield "L\t{node1}\t{orientation1}\t{node2}\t{orientation2}\t{overlap}\n".format(
node1=node1,
orientation1="+",
node2=node2,
orientation2="+",
overlap = overlap
)
def visualize(self, edgelabel='prob', current_node=None,
draw='pygraphviz'):
"""
Visualizes a LOMAP system model.
"""
assert edgelabel is None or nx.is_weighted(self.g, weight=edgelabel)
if draw == 'pygraphviz':
nx.view_pygraphviz(self.g, edgelabel)
elif draw == 'matplotlib':
pos = nx.get_node_attributes(self.g, 'location')
if len(pos) != self.g.number_of_nodes():
pos = nx.spring_layout(self.g)
if current_node is None:
colors = 'r'
else:
if current_node == 'init':
current_node = next(self.init.iterkeys())
colors = dict([(v, 'r') for v in self.g])
colors[current_node] = 'b'
colors = colors.values()
nx.draw(self.g, pos=pos, node_color=colors)
nx.draw_networkx_labels(self.g, pos=pos)
edge_labels = nx.get_edge_attributes(self.g, edgelabel)
nx.draw_networkx_edge_labels(self.g, pos=pos,
edge_labels=edge_labels)
else:
raise ValueError('Expected parameter draw to be either:'
+ '"pygraphviz" or "matplotlib"!')
def visualize(self, edgelabel='control', current_node=None,
draw='pygraphviz'):
"""
Visualizes a LOMAP system model.
"""
assert edgelabel is None or nx.is_weighted(self.g, weight=edgelabel)
if draw == 'pygraphviz':
nx.view_pygraphviz(self.g, edgelabel)
elif draw == 'matplotlib':
pos = nx.get_node_attributes(self.g, 'location')
if len(pos) != self.g.number_of_nodes():
pos = nx.spring_layout(self.g)
if current_node is None:
colors = 'r'
else:
if current_node == 'init':
current_node = next(self.init.iterkeys())
colors = dict([(v, 'r') for v in self.g])
colors[current_node] = 'b'
colors = colors.values()
nx.draw(self.g, pos=pos, node_color=colors)
nx.draw_networkx_labels(self.g, pos=pos)
edge_labels = nx.get_edge_attributes(self.g, edgelabel)
nx.draw_networkx_edge_labels(self.g, pos=pos,
edge_labels=edge_labels)
else:
raise ValueError('Expected parameter draw to be either:'
+ '"pygraphviz" or "matplotlib"!')
def animate(self, save=False):
"""
Animates the Given algorithm with given Graph
:param save: Boolean indicating weather output has to be written into output/
"""
result = self.fn(self.graph)
for matrix, active in result:
self.frames.append(matrix)
self.active.append(active)
# Draw the original matrix
if self.pos is None:
self.pos = nx.nx_pydot.graphviz_layout(self.graph)
nx.draw_networkx_nodes(self.graph, self.pos, ax=self.ax1, node_color='g', alpha=0.8,
node_size=self.node_size).set_edgecolor('k')
nx.draw_networkx_edges(self.graph, self.pos, ax=self.ax1, alpha=0.6)
if self.weights:
nx.draw_networkx_edge_labels(self.graph, self.pos, ax=self.ax1,
edge_labels=nx.get_edge_attributes(self.graph, 'weight'))
if self.lables:
nx.draw_networkx_labels(self.graph, self.pos, ax=self.ax1)
# Draw its adjacancy matrix
vmin = 0
vmax = np.max(np.ma.array(self.frames[-1], mask=np.isinf(self.frames[-1])))
cmap = plt.get_cmap('jet')
cmap.set_bad('white', 1.)
masked_array = np.ma.array(self.frames[0], mask=np.isinf(self.frames[0]))
self.ax2.imshow(masked_array, interpolation='nearest', vmin=vmin, vmax=vmax, alpha=0.7)
if self.matrix_labels:
self.__plot_matrix_labels(self.frames[0], self.ax2)
# Now start the animation
x = animation.FuncAnimation(self.fig, self.__update, interval=1000, blit=False,
repeat=False, init_func=self.__init_animation, frames=len(self.frames))
if save:
import errno
import os
path = "output"
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
Writer = animation.writers['ffmpeg']
writer = Writer(fps=1, metadata=dict(artist='V'), bitrate=1800)
from multiprocessing import Process
import os
path = os.path.join('output', '%s.mp4' % self.fn.__name__)
Process(target=x.save, args=(path,), kwargs={'writer': writer}).start()
plt.show()
def apply_to_graph(self, show_graph=True):
"""
Applies the given algorithm to given graph and displays it
:param show_graph: Weather to show the graph in final result or not
"""
# Draw the original matrix
if show_graph:
if self.pos is None:
self.pos = nx.nx_pydot.graphviz_layout(self.graph)
nx.draw_networkx_nodes(self.graph, self.pos, ax=self.ax1, node_color='g', alpha=0.8,
node_size=self.node_size).set_edgecolor('k')
nx.draw_networkx_edges(self.graph, self.pos, ax=self.ax1, alpha=0.5)
if self.weights:
nx.draw_networkx_edge_labels(self.graph, self.pos, ax=self.ax1,
edge_labels=nx.get_edge_attributes(self.graph, 'weight'))
if self.lables:
nx.draw_networkx_labels(self.graph, self.pos, ax=self.ax1)
# Draw its adjacancy matrix
result, adj = None, None
for i, matrix in enumerate(self.fn(self.graph)):
if i == 0:
adj = matrix[0]
result = matrix[0]
# print(adj, result)
cmap = plt.get_cmap('jet')
cmap.set_bad('white', 1.)
vmin = 0
vmax = np.max(result)
from mpl_toolkits.axes_grid1 import make_axes_locatable
div = make_axes_locatable(self.ax2)
cax = div.append_axes('right', '5%', '5%')
cax.axis('off')
masked_array = np.ma.array(adj, mask=np.isinf(adj))
self.ax2.imshow(masked_array, interpolation='nearest', cmap=cmap, vmin=vmin, vmax=vmax)
if self.matrix_labels:
self.__plot_matrix_labels(adj, self.ax2)
# Now draw the final matrix
masked_array = np.ma.array(result, mask=np.isinf(result))
div = make_axes_locatable(self.ax3)
cax = div.append_axes('right', '5%', '5%')
if self.matrix_labels:
self.__plot_matrix_labels(result, self.ax3)
self.img = self.ax3.imshow(masked_array, interpolation='nearest', cmap=cmap, vmin=vmin, vmax=vmax)
self.fig.colorbar(self.img, cax=cax)
plt.show()
def apply_to_graph(fun, G = None):
"""
Applies given algorithm to random geometric graph and displays the results side by side
:param fun: A function which has the signature f(G) and returns iterator of edges of graph G
:param G: a networkx Graph. If None, random geometric graph is created and applied
:return: Plot showing G and fun(G)
"""
if G is None:
G = nx.random_geometric_graph(100, .125)
# position is stored as node attribute data for random_geometric_graph
pos = nx.get_node_attributes(G, 'pos')
nodesize = 80
for u, v in G.edges():
G.edge[u][v]['weight'] = ((G.node[v]['pos'][0] - G.node[u]['pos'][0]) ** 2 +
(G.node[v]['pos'][1] - G.node[u]['pos'][1]) ** 2) ** .5
else:
pos = graphviz_layout(G)
nodesize = 200
# find node near center (0.5,0.5)
color = {}
dmin = 1
ncenter = 0
for n in pos:
x, y = pos[n]
d = (x - 0.5) ** 2 + (y - 0.5) ** 2
color[n] = d
if d < dmin:
ncenter = n
dmin = d
res = nx.Graph(list(fun(G)))
plt.figure(figsize=(10, 8))
plt.suptitle(fun.__name__ + " algorithm application")
plt.subplot(1, 2, 1)
plt.title("Original Graph G")
nx.draw_networkx_edges(G, pos, nodelist=[ncenter], alpha=0.4)
nx.draw_networkx_nodes(G, pos,
nodelist=color.keys(),
node_size=nodesize,
node_color=list(color.values()),
cmap=plt.get_cmap("Reds_r")
).set_edgecolor('k')
if G is not None:
nx.draw_networkx_labels(G,pos)
nx.draw_networkx_edge_labels(G,pos,
edge_labels=nx.get_edge_attributes(G,'weight'))
plt.axis('off')
plt.subplot(1, 2, 2)
plt.title("Resultant Graph, R = {0}(G)".format(fun.__name__))
nx.draw_networkx_edges(res, pos, nodelist=[ncenter], alpha=0.4)
nx.draw_networkx_nodes(res, pos,
node_color=list(color[n] for n in res.nodes()),
node_size=nodesize,
cmap=plt.get_cmap("Greens_r")).set_edgecolor('k')
if G is not None:
nx.draw_networkx_labels(res,pos)
nx.draw_networkx_edge_labels(res, pos,
edge_labels=nx.get_edge_attributes(res, 'weight'))
plt.axis('off')
plt.show()