def test_flatten_edges(self):
g = nx.MultiDiGraph()
g.add_edge(1, 2, key=5, attr_dict={'A': 'a', 'B': {'C': 'c', 'D': 'd'}})
result = pybel.utils.flatten_graph_data(g)
expected = nx.MultiDiGraph()
expected.add_edge(1, 2, key=5, attr_dict={'A': 'a', 'B_C': 'c', 'B_D': 'd'})
self.assertEqual(set(result.nodes()), set(expected.nodes()))
res_edges = result.edges(keys=True)
exp_edges = expected.edges(keys=True)
self.assertEqual(set(res_edges), set(exp_edges))
for u, v, k in expected.edges(keys=True):
self.assertEqual(expected[u][v][k], result[u][v][k])
python类MultiDiGraph()的实例源码
def to_graphml(graph, file):
"""Writes this graph to GraphML XML file using :func:`networkx.write_graphml`. The .graphml file extension is
suggested so Cytoscape can recognize it.
:param BELGraph graph: A BEL graph
:param file file: A file or file-like object
"""
g = nx.MultiDiGraph()
for node, data in graph.nodes(data=True):
g.add_node(node, json=json.dumps(data))
for u, v, key, data in graph.edges(data=True, keys=True):
g.add_edge(u, v, key=key, attr_dict=flatten_dict(data))
nx.write_graphml(g, file)
def flatten_graph_data(graph):
"""Returns a new graph with flattened edge data dictionaries.
:param nx.MultiDiGraph graph: A graph with nested edge data dictionaries
:return: A graph with flattened edge data dictionaries
:rtype: nx.MultiDiGraph
"""
g = nx.MultiDiGraph(**graph.graph)
for node, data in graph.nodes(data=True):
g.add_node(node, data)
for u, v, key, data in graph.edges(data=True, keys=True):
g.add_edge(u, v, key=key, attr_dict=flatten_dict(data))
return g
def nx_graph(self):
"""Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
import networkx
nx_nodelist = list(range(1, len(self.nodes)))
nx_edgelist = [
(n, self._hd(n), self._rel(n))
for n in nx_nodelist if self._hd(n)
]
self.nx_labels = {}
for n in nx_nodelist:
self.nx_labels[n] = self.nodes[n]['word']
g = networkx.MultiDiGraph()
g.add_nodes_from(nx_nodelist)
g.add_edges_from(nx_edgelist)
return g
def _acceptable_subgraph(self): # type: () -> nx.MultiDiGraph
graph = self.as_multidigraph
reachable_states = nx.descendants(graph, self.start) | {self.start}
graph = graph.subgraph(reachable_states)
reachable_accepting_states = reachable_states & {
node for node in graph.node if graph.node[node]['accepting']
}
# Add a "sink" node with an in-edge from every accepting state. This is
# is solely done because the networkx API makes it easier to find the
# ancestor of a node than a set of nodes.
sink = object()
graph.add_node(sink)
for state in reachable_accepting_states:
graph.add_edge(state, sink)
acceptable_sates = nx.ancestors(graph, sink)
return graph.subgraph(acceptable_sates)
def live_subgraph(self): # type: () -> nx.MultiDiGraph
"""
Returns the graph of "live" states for this graph, i.e. the start state
together with states that may be involved in positively matching a string
(reachable from the start node and an ancestor of an accepting node).
This is intended for display purposes, only showing the paths which
might lead to an accepting state, or just the start state if no such
paths exist.
"""
graph = self.as_multidigraph
accepting_states = {
node for node in graph.node if graph.node[node]['accepting']
}
# Add a "sink" node with an in-edge from every accepting state. This is
# is solely done because the networkx API makes it easier to find the
# ancestor of a node than a set of nodes.
sink = object()
graph.add_node(sink)
for state in accepting_states:
graph.add_edge(state, sink)
live_states = {self.start} | (nx.ancestors(graph, sink) & nx.descendants(graph, self.start))
return graph.subgraph(live_states)
dependencygraph.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def nx_graph(self):
"""Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
import networkx
nx_nodelist = list(range(1, len(self.nodes)))
nx_edgelist = [
(n, self._hd(n), self._rel(n))
for n in nx_nodelist if self._hd(n)
]
self.nx_labels = {}
for n in nx_nodelist:
self.nx_labels[n] = self.nodes[n]['word']
g = networkx.MultiDiGraph()
g.add_nodes_from(nx_nodelist)
g.add_edges_from(nx_edgelist)
return g
def test_multidigraph_is_subgraph_without_labels(self):
G = nx.MultiDiGraph()
G.add_edges_from(
[
(1, 2),
(2, 3),
(3, 4),
(2, 4)
]
)
valid_subgraph_a = nx.MultiDiGraph()
valid_subgraph_a.add_edge(1, 2)
valid_subgraph_b = nx.MultiDiGraph()
valid_subgraph_b.add_edge(3, 4)
invalid_subgraph_a = nx.MultiDiGraph()
invalid_subgraph_a.add_edge(1, 4)
self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_a))
self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_b))
self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_a))
def nx_graph(self):
"""Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
import networkx
nx_nodelist = list(range(1, len(self.nodes)))
nx_edgelist = [
(n, self._hd(n), self._rel(n))
for n in nx_nodelist if self._hd(n)
]
self.nx_labels = {}
for n in nx_nodelist:
self.nx_labels[n] = self.nodes[n]['word']
g = networkx.MultiDiGraph()
g.add_nodes_from(nx_nodelist)
g.add_edges_from(nx_edgelist)
return g
def combined_stop_to_stop_transit_network(gtfs, start_time_ut=None, end_time_ut=None):
"""
Compute stop-to-stop networks for all travel modes and combine them into a single network.
The modes of transport are encoded to a single network.
The network consists of multiple links corresponding to each travel mode.
Walk mode is not included.
Parameters
----------
gtfs: gtfspy.GTFS
Returns
-------
net: networkx.MultiDiGraph
keys should be one of route_types.TRANSIT_ROUTE_TYPES (i.e. GTFS route_types)
"""
multi_di_graph = networkx.MultiDiGraph()
for route_type in route_types.TRANSIT_ROUTE_TYPES:
graph = stop_to_stop_network_for_route_type(gtfs, route_type,
start_time_ut=start_time_ut, end_time_ut=end_time_ut)
for from_node, to_node, data in graph.edges(data=True):
data['route_type'] = route_type
multi_di_graph.add_edges_from(graph.edges(data=True))
multi_di_graph.add_nodes_from(graph.nodes(data=True))
return multi_di_graph
def main():
# sample = ','.join(['MATCH (x:SOMECLASS {bar : "baz"',
# 'foo:"goo"})<-[:WHATEVER]-(:ANOTHERCLASS)',
# '(y:LASTCLASS) RETURN x.foo, y'])
# create = ('CREATE (n:SOMECLASS {foo: "bar", bar: {qux: "baz"}})'
# '-[e:EDGECLASS]->(m:ANOTHERCLASS) RETURN n')
# create = 'CREATE (n:SOMECLASS {foo: "bar", qux: "baz"}) RETURN n'
create_query = ('CREATE (n:SOMECLASS {foo: {goo: "bar"}})'
'-[e:EDGECLASS]->(m:ANOTHERCLASS {qux: "foobar", bar: 10}) '
'RETURN n')
test_query = ('MATCH (n:SOMECLASS {foo: {goo: "bar"}})-[e:EDGECLASS]->'
'(m:ANOTHERCLASS) WHERE '
'm.bar = 10 '
'RETURN n.foo.goo, m.qux, e')
# atomic_facts = extract_atomic_facts(test_query)
graph_object = nx.MultiDiGraph()
my_parser = CypherToNetworkx()
for i in my_parser.query(graph_object, create_query):
pass # a generator, we need to loop over results to run.
for i in my_parser.query(graph_object, test_query):
print i
def nx_graph(self):
"""Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
import networkx
nx_nodelist = list(range(1, len(self.nodes)))
nx_edgelist = [
(n, self._hd(n), self._rel(n))
for n in nx_nodelist if self._hd(n)
]
self.nx_labels = {}
for n in nx_nodelist:
self.nx_labels[n] = self.nodes[n]['word']
g = networkx.MultiDiGraph()
g.add_nodes_from(nx_nodelist)
g.add_edges_from(nx_edgelist)
return g
def build_layer2_graph(related_extra=None):
"""Builds a graph representation of the layer 2 topology stored in the NAV
database.
:param related_extra Additional selection_related fields
:returns: A MultiDiGraph of Netbox nodes, edges annotated with Interface
model objects.
"""
graph = nx.MultiDiGraph(name="Layer 2 topology")
select_related = ('netbox', 'to_netbox', 'to_interface')
if related_extra:
select_related = select_related + related_extra
links = Interface.objects.filter(
to_netbox__isnull=False).select_related(*select_related)
for link in links:
dest = link.to_interface.netbox if link.to_interface else link.to_netbox
graph.add_edge(link.netbox, dest, key=link)
return graph
def test_reduce_simple_case_cam(self):
graph = nx.MultiDiGraph(name="simple case cam")
graph.add_edge(self.switch_a, self.switch_port_a)
graph.add_edge(self.switch_b, self.switch_port_b)
graph.add_edge(self.switch_port_a, self.switch_b, "cam")
graph.add_edge(self.switch_port_b, self.switch_a, "cam")
reducer = AdjacencyReducer(graph)
print("input:")
print(reducer.format_connections())
reducer.reduce()
print("result:")
print(reducer.format_connections())
result = reducer.graph
assert result.has_edge(self.switch_port_a, self.switch_port_b)
assert result.has_edge(self.switch_port_b, self.switch_port_a)
assert result.out_degree(self.switch_port_a) == 1
assert result.out_degree(self.switch_port_b) == 1
def test_reduce_simple_case_lldp(self):
graph = nx.MultiDiGraph(name="simple case lldp")
graph.add_edge(self.switch_a, self.switch_port_a)
graph.add_edge(self.switch_b, self.switch_port_b)
graph.add_edge(self.switch_port_a, self.switch_port_b, "lldp")
graph.add_edge(self.switch_port_b, self.switch_port_a, "lldp")
reducer = AdjacencyReducer(graph)
print("input:")
print(reducer.format_connections())
reducer.reduce()
print("result:")
print(reducer.format_connections())
result = reducer.graph
assert result.has_edge(self.switch_port_a, self.switch_port_b)
assert result.has_edge(self.switch_port_b, self.switch_port_a)
assert result.out_degree(self.switch_port_a) == 1
assert result.out_degree(self.switch_port_b) == 1
def test_reduce_simple_tree_lldp(self):
graph = nx.MultiDiGraph(name="simple tree lldp")
graph.add_edge(self.router, self.router_port_a)
graph.add_edge(self.router, self.router_port_b)
graph.add_edge(self.switch_a, self.switch_port_a)
graph.add_edge(self.switch_b, self.switch_port_b)
graph.add_edge(self.switch_port_a, self.router_port_a, "lldp")
graph.add_edge(self.switch_port_b, self.router_port_b, "lldp")
graph.add_edge(self.router_port_a, self.switch_port_a, "lldp")
graph.add_edge(self.router_port_b, self.switch_port_b, "lldp")
reducer = AdjacencyReducer(graph)
print("input:")
print(reducer.format_connections())
reducer.reduce()
print("result:")
print(reducer.format_connections())
result = reducer.graph
assert result.has_edge(self.switch_port_a, self.router_port_a)
assert result.has_edge(self.switch_port_b, self.router_port_b)
assert result.has_edge(self.router_port_a, self.switch_port_a)
assert result.has_edge(self.router_port_b, self.switch_port_b)
assert result.out_degree(self.switch_port_a) == 1
assert result.out_degree(self.switch_port_b) == 1
assert result.out_degree(self.router_port_a) == 1
assert result.out_degree(self.router_port_b) == 1
def test_no_return_path(self):
graph = nx.MultiDiGraph()
graph.add_edge(self.switch_a, self.switch_port_a)
graph.add_edge(self.switch_b, self.switch_port_b)
graph.add_edge(self.switch_port_a, self.switch_b, "cam")
reducer = AdjacencyReducer(graph)
print("input:")
print(reducer.format_connections())
reducer.reduce()
print("result:")
print(reducer.format_connections())
result = reducer.graph
assert result.has_edge(self.switch_port_a, self.switch_b)
assert not result.has_edge(self.switch_port_b, self.switch_port_a)
assert result.out_degree(self.switch_port_a) == 1
assert self.switch_port_b not in result
def nx_graph(self):
"""Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
import networkx
nx_nodelist = list(range(1, len(self.nodes)))
nx_edgelist = [
(n, self._hd(n), self._rel(n))
for n in nx_nodelist if self._hd(n)
]
self.nx_labels = {}
for n in nx_nodelist:
self.nx_labels[n] = self.nodes[n]['word']
g = networkx.MultiDiGraph()
g.add_nodes_from(nx_nodelist)
g.add_edges_from(nx_edgelist)
return g
def nx_graph(self):
"""Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
import networkx
nx_nodelist = list(range(1, len(self.nodes)))
nx_edgelist = [
(n, self._hd(n), self._rel(n))
for n in nx_nodelist if self._hd(n)
]
self.nx_labels = {}
for n in nx_nodelist:
self.nx_labels[n] = self.nodes[n]['word']
g = networkx.MultiDiGraph()
g.add_nodes_from(nx_nodelist)
g.add_edges_from(nx_edgelist)
return g
def nx_graph(self):
"""Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
import networkx
nx_nodelist = list(range(1, len(self.nodes)))
nx_edgelist = [
(n, self._hd(n), self._rel(n))
for n in nx_nodelist if self._hd(n)
]
self.nx_labels = {}
for n in nx_nodelist:
self.nx_labels[n] = self.nodes[n]['word']
g = networkx.MultiDiGraph()
g.add_nodes_from(nx_nodelist)
g.add_edges_from(nx_edgelist)
return g
def nx_graph(self):
"""Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
import networkx
nx_nodelist = list(range(1, len(self.nodes)))
nx_edgelist = [
(n, self._hd(n), self._rel(n))
for n in nx_nodelist if self._hd(n)
]
self.nx_labels = {}
for n in nx_nodelist:
self.nx_labels[n] = self.nodes[n]['word']
g = networkx.MultiDiGraph()
g.add_nodes_from(nx_nodelist)
g.add_edges_from(nx_edgelist)
return g
def filter_graph(g, cutoff=7.0, min_kihs=2):
""" Get subgraph formed from edges that have max_kh_distance < cutoff.
Parameters
----------
g : MultiDiGraph representing KIHs
g is the output from graph_from_protein
cutoff : float
Socket cutoff in Angstroms.
Default is 7.0.
min_kihs : int
Minimum number of KIHs shared between all pairs of connected nodes in the graph.
Returns
-------
networkx.MultiDigraph
subgraph formed from edges that have max_kh_distance < cutoff.
"""
edge_list = [e for e in g.edges(keys=True, data=True) if e[3]['kih'].max_kh_distance <= cutoff]
if min_kihs > 0:
c = Counter([(e[0], e[1]) for e in edge_list])
# list of nodes that share > min_kihs edges with at least one other node.
node_list = set(list(itertools.chain.from_iterable([k for k, v in c.items() if v > min_kihs])))
edge_list = [e for e in edge_list if (e[0] in node_list) and (e[1] in node_list)]
return networkx.MultiDiGraph(edge_list)
def __init__(self, root_path):
self.code_graph = nx.MultiDiGraph()
self.metrics = {}
self.root_path = Path(root_path)
self.root_arch_ids = []
self.entity_kinds = SortedSet()
self.ref_kinds = SortedSet()
def projected_graph(B, nodes, multigraph=False):
if B.is_multigraph():
raise nx.NetworkXError("not defined for multigraphs")
if B.is_directed():
directed=True
if multigraph:
G=nx.MultiDiGraph()
else:
G=nx.DiGraph()
else:
directed=False
if multigraph:
G=nx.MultiGraph()
else:
G=nx.Graph()
G.graph.update(B.graph)
G.add_nodes_from((n,B.node[n]) for n in nodes)
i = 0
nodes = set(nodes)
tenpercent = len(nodes) / 10
for u in nodes:
if i % tenpercent == 0:
logging.info(str(10 * i / tenpercent) + "%")
i += 1
nbrs2=set((v for nbr in B[u] for v in B[nbr])) & nodes - set([u])
if multigraph:
for n in nbrs2:
if directed:
links=set(B[u]) & set(B.pred[n])
else:
links=set(B[u]) & set(B[n])
for l in links:
if not G.has_edge(u,n,l):
G.add_edge(u,n,key=l)
else:
G.add_edges_from((u,n) for n in nbrs2)
return G
def test_dict_matches_graph(self):
g = nx.MultiDiGraph()
g.add_node(1)
g.add_node(2)
g.add_edge(1, 2, relation='yup')
g.add_edge(1, 2, relation='nope')
d = {'relation': 'yup'}
self.assertTrue(any_subdict_matches(g.edge[1][2], d))
def create_project_graph(project):
"""
:param Project project:
:rtype: nx.MultiDiGraph
"""
g = nx.MultiDiGraph()
assets = list(project.iterate_assets())
add_assets_to_graph(g, assets)
return g
def new_graph():
"""Get a clean graph."""
graph = networkx.MultiDiGraph()
graph.graph = ATTR_GRAPH
return graph
def __init__(self):
"""Create an empty circuit."""
# Map from a wire's name (reg,idx) to a Bool that is True if the
# wire is a classical bit and False if the wire is a qubit.
self.wire_type = {}
# Map from wire names (reg,idx) to input nodes of the graph
self.input_map = {}
# Map from wire names (reg,idx) to output nodes of the graph
self.output_map = {}
# Running count of the total number of nodes
self.node_counter = 0
# Map of named operations in this circuit and their signatures.
# The signature is an integer tuple (nq,nc,np) specifying the
# number of input qubits, input bits, and real parameters.
# The definition is external to the circuit object.
self.basis = {}
# Directed multigraph whose nodes are inputs, outputs, or operations.
# Operation nodes have equal in- and out-degrees and carry
# additional data about the operation, including the argument order
# and parameter values.
# Input nodes have out-degree 1 and output nodes have in-degree 1.
# Edges carry wire labels (reg,idx) and each operation has
# corresponding in- and out-edges with the same wire labels.
self.multi_graph = nx.MultiDiGraph()
# Map of qregs to sizes
self.qregs = {}
# Map of cregs to sizes
self.cregs = {}
# Map of user defined gates to ast nodes defining them
self.gates = {}
# Output precision for printing floats
self.prec = 10
def compute_centrality(graph: nx.MultiDiGraph, data: geojson.feature.FeatureCollection, edge_map: Dict):
for item in data['features']:
edge = edge_map[item['properties']['id']]
from_degree = graph.degree(edge[0])
to_degree = graph.degree(edge[1])
item['properties']["from_degree"] = from_degree
item['properties']["to_degree"] = to_degree
def _create_edge_map(graph: nx.MultiDiGraph) -> Dict:
edge_map = {}
for edge in graph.edges():
edge_map[graph[edge[0]][edge[1]][0]["id"]] = edge
return edge_map