def test_graph_to_dag(test_graph):
logger = logging.getLogger('graph_to_dag')
g = test_graph
for partition, acyclic in partition_graph(g):
if acyclic:
continue
dag, dfs_tree = graph_to_dag(partition)
logger.debug("%s", dag.nodes())
logger.debug("%s", dag.edges())
logger.debug("DFS nodes: %s", dfs_tree.nodes())
logger.debug("DFS edges: %s", dfs_tree.edges())
assert networkx.is_directed_acyclic_graph(dag)
python类is_directed_acyclic_graph()的实例源码
remove_cycle_edges_by_minimum_feedback_arc_set_greedy.py 文件源码
项目:breaking_cycles_in_noisy_hierarchies
作者: zhenv5
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def remove_cycle_edges_by_mfas(graph_file):
g = nx.read_edgelist(graph_file,create_using = nx.DiGraph(),nodetype = int)
from remove_self_loops import remove_self_loops_from_graph
self_loops = remove_self_loops_from_graph(g)
scc_nodes,_,_,_ = scc_nodes_edges(g)
degree_dict = get_nodes_degree_dict(g,scc_nodes)
sccs = get_big_sccs(g)
if len(sccs) == 0:
print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g))
return self_loops
edges_to_be_removed = []
import timeit
t1 = timeit.default_timer()
greedy_local_heuristic(sccs,degree_dict,edges_to_be_removed)
t2 = timeit.default_timer()
print("mfas time usage: %0.4f s" % (t2 - t1))
edges_to_be_removed = list(set(edges_to_be_removed))
g.remove_edges_from(edges_to_be_removed)
edges_to_be_removed += self_loops
edges_to_be_removed_file = graph_file[:len(graph_file)-6] + "_removed_by_mfas.edges"
write_pairs_to_file(edges_to_be_removed,edges_to_be_removed_file)
return edges_to_be_removed
def networkx(self):
"""The network converted to `networkx` format.
Seems like it looses directionality.
* https://networkx.readthedocs.org/en/stable/
To install: $ pip install networkx
"""
g = self.orange_obo.to_networkx()
assert networkx.is_directed_acyclic_graph(g)
return g
# --------------------------- In this section --------------------------- #
# test
# get_subgraph
# add_weights
# draw_to_pdf
# write_to_dot
def draw_dag(cls, path, data_keys):
# pylint: disable=protected-access
dag = cls._dag.draw(path, data_keys, root_node_key='generate',
reverse=True)
if not nx.is_directed_acyclic_graph(dag):
print("Warning! The graph is not acyclic!")
def graph_is_valid(self):
"""Checks if the graph is valid."""
# Check if the graph is a DAG
is_dag = is_directed_acyclic_graph(self.graph)
# Check if output nodes are sinks
output_nodes_are_sinks = all([self.is_sink_node(name) for name in self.output_nodes])
# Check inf input nodes are sources
input_nodes_are_sources = all([self.is_source_node(name) for name in self.input_nodes])
# TODO Check whether only input nodes are sources and only output nodes are sinks
# Conclude
is_valid = is_dag and output_nodes_are_sinks and input_nodes_are_sources
return is_valid
def assert_graph_is_valid(self):
"""Asserts that the graph is valid."""
assert is_directed_acyclic_graph(self.graph), "Graph is not a DAG."
for name in self.output_nodes:
assert self.is_sink_node(name), "Output node {} is not a sink.".format(name)
assert not self.is_source_node(name), "Output node {} is a source node. " \
"Make sure it's connected.".format(name)
for name in self.input_nodes:
assert self.is_source_node(name), "Input node {} is not a source.".format(name)
assert not self.is_sink_node(name), "Input node {} is a sink node. " \
"Make sure it's connected.".format(name)
def depth(self):
"""Return the circuit depth."""
assert nx.is_directed_acyclic_graph(self.multi_graph), "not a DAG"
return nx.dag_longest_path_length(self.multi_graph) - 1
def add_skip(self, model, config):
assert nx.is_directed_acyclic_graph(model.graph)
topo_nodes = nx.topological_sort(model.graph)
names = [node.name for node in topo_nodes
if node.type == 'Conv2D' or node.type == 'Group' or node.type == 'Conv2D_Pooling']
if len(names) <= 2:
logger.info('can\'t find a suitable layer to apply add_skip operation,return origin model')
return model, False
max_iter = 100
for i in range(max_iter + 1):
if i == max_iter:
logger.info('can\'t find a suitable layer to apply add_skip operation,return origin model')
return model, False
from_idx = np.random.randint(0, len(names) - 2)
to_idx = from_idx + 1
next_nodes = model.graph.get_nodes(names[to_idx], next_layer=True, last_layer=False)
if 'Add' in [node.type for node in next_nodes]:
continue
else:
break
from_name = names[from_idx]
to_name = names[to_idx]
logger.info('choose {} and {} to add_skip'.format(from_name, to_name))
return self.skip(model, from_name, to_name, config), True
# add group operation
def __transitive_reduction(self):
"""Transitive reduction for acyclic graphs."""
assert nx.is_directed_acyclic_graph(self.digraph)
for u in self.digraph:
transitive_vertex = []
for v in self.digraph[u]:
transitive_vertex.extend(
x for _, x in nx.dfs_edges(self.digraph, v))
self.digraph.remove_edges_from((u, x) for x in transitive_vertex)
def has_finite_language(self): # type: () -> bool
"""
Returns True iff this DFA recognizes a finite (possibly empty) language.
Based on decision procedure described here:
http://math.uaa.alaska.edu/~afkjm/cs351/handouts/non-regular.pdf
- Remove nodes which cannot reach an accepting state (see
`_acceptable_subgraph` above).
- Language is finite iff the remaining graph is acyclic.
"""
return nx.is_directed_acyclic_graph(self._acceptable_subgraph)
def validate(self, graph):
""" Validate the graph by checking whether it is a directed acyclic graph.
Args:
graph (DiGraph): Reference to a DiGraph object from NetworkX.
Raises:
DirectedAcyclicGraphInvalid: If the graph is not a valid dag.
"""
if not nx.is_directed_acyclic_graph(graph):
raise DirectedAcyclicGraphInvalid(graph_name=self._name)
remove_cycle_edges_by_hierarchy_greedy.py 文件源码
项目:breaking_cycles_in_noisy_hierarchies
作者: zhenv5
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def scc_based_to_remove_cycle_edges_iterately(g,nodes_score):
from remove_self_loops import remove_self_loops_from_graph
self_loops = remove_self_loops_from_graph(g)
big_sccs = get_big_sccs(g)
scc_nodes_score_dict = scores_of_nodes_in_scc(big_sccs,nodes_score)
edges_to_be_removed = []
if len(big_sccs) == 0:
print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g))
return self_loops
remove_cycle_edges_by_agony_iterately(big_sccs,scc_nodes_score_dict,edges_to_be_removed)
#print(" # edges to be removed: %d" % len(edges_to_be_removed))
return edges_to_be_removed+self_loops
remove_cycle_edges_by_hierarchy_BF.py 文件源码
项目:breaking_cycles_in_noisy_hierarchies
作者: zhenv5
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def scc_based_to_remove_cycle_edges_iterately(g,nodes_score,is_Forward):
big_sccs = get_big_sccs(g)
if len(big_sccs) == 0:
print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g))
return []
scc_nodes_score_dict = scores_of_nodes_in_scc(big_sccs,nodes_score)
edges_to_be_removed = []
remove_cycle_edges_by_ranking_score_iterately(big_sccs,scc_nodes_score_dict,edges_to_be_removed,is_Forward)
#print(" # edges to be removed: %d" % len(edges_to_be_removed))
return edges_to_be_removed
remove_cycle_edges_by_hierarchy_voting.py 文件源码
项目:breaking_cycles_in_noisy_hierarchies
作者: zhenv5
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def scc_based_to_remove_cycle_edges_iterately(g,edges_score):
big_sccs = get_big_sccs(g)
if len(big_sccs) == 0:
print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g))
return []
edges_to_be_removed = []
remove_cycle_edges_by_agony_iterately(big_sccs,edges_score,edges_to_be_removed)
#print(" # edges to be removed: %d" % len(edges_to_be_removed))
return edges_to_be_removed
def __validate_list(self):
non_source_sets = self._list[1:-1]
# every non-source anchor has an incoming neighbor
# both before and after it in the skeleton list.
for anchor in (self._get_anchor(t) for t in non_source_sets):
found_after = False
found_before = False
my_idx = self._get_index(anchor)
for pred in self._get_external_predecessors(anchor):
pred_idx = self._get_index(pred)
if pred_idx < my_idx:
found_before = True
# HACK: root is in 2 places!
if pred == self.root:
pred_idx = len(self._list) - 1
if pred_idx > my_idx:
found_after = True
assert(found_after and found_before)
# every non-source set is pair-wise disjoint with every other set
for s1, s2 in itertools.combinations(non_source_sets, 2):
assert(len(set(s1.nodes()).intersection(s2.nodes())) == 0)
# directed path from anchor to every other node in its set
assert(all(nx.is_directed_acyclic_graph(t) for t in non_source_sets))
# first and last sets are always just the root
assert(self.root in self._list[0] and self.root in self._list[-1] and\
self._list[0].number_of_nodes() == 1 and self._list[-1].number_of_nodes() == 1)
return True
def calculate_account_data(prov_document: provmodel.ProvDocument) -> list:
"""
Transforms a ProvDocument into a list of tuples including:
ProvAgent, list of ProvRelations from agent,
list of ProvElements associated to ProvAgent,
list of Namespaces
:param prov_document: Document to transform
:type prov_document:
:return: List of tuples(ProvAgent, list(), list(), list())
:rtype: list
"""
namespaces = prov_document.get_registered_namespaces()
g = provgraph.prov_to_graph(prov_document=prov_document)
sorted_nodes = topological_sort(g, reverse=True)
agents = list(filter(lambda elem: isinstance(elem, provmodel.ProvAgent), sorted_nodes))
elements = list(filter(lambda elem: not isinstance(elem, provmodel.ProvAgent), sorted_nodes))
# Check on compatibility
if not is_directed_acyclic_graph(g):
raise Exception("Provenance graph is not acyclic")
if isolates(g):
raise Exception("Provenance not compatible with role-based concept. Has isolated Elements")
for element in elements:
if provmodel.ProvAgent not in [type(n) for n in g.neighbors(element)]:
raise Exception(
"Provenance not compatible with role-based concept. Element {} has not relation to any agent".format(
element))
accounts = []
for agent in agents:
# find out-going relations from agent
agent_relations = []
for u, v in g.out_edges(agent):
# Todo check if filter does not left out some info
agent_relations.append(g.get_edge_data(u, v)[0]['relation'])
agent_elements = {}
i = 0
for element in elements:
element_relations = []
if g.has_edge(element, agent):
for u, v in set(g.out_edges(element)):
for relation in g[u][v].values():
element_relations.append(relation['relation'])
agent_elements[i] = {element: element_relations}
i += 1
accounts.append((agent, agent_relations, agent_elements, namespaces))
return accounts
def add_typing(self, source, target, mapping,
total=False, attrs=None):
"""Add homomorphism to the hierarchy."""
if source not in self.nodes():
raise HierarchyError(
"Node '%s' is not defined in the hierarchy!" % source)
if target not in self.nodes():
raise HierarchyError(
"Node '%s' is not defined in the hierarchy!" % target)
if (source, target) in self.edges():
raise HierarchyError(
"Edge '%s->%s' already exists in the hierarchy: "
"no muliple edges allowed!" %
(source, target)
)
if not isinstance(self.node[source], GraphNode):
if type(self.node[source]) == RuleNode:
raise HierarchyError(
"Source node is a rule, use `add_rule_typing` "
"method instead!"
)
else:
raise HierarchyError(
"Source of a typing should be a graph, `%s` is provided!" %
type(self.node[source])
)
if not isinstance(self.node[target], GraphNode):
raise HierarchyError(
"Target of a typing should be a graph, `%s` is provided!" %
type(self.node[target])
)
# check no cycles are produced
self.add_edge(source, target)
if not nx.is_directed_acyclic_graph(self):
self.remove_edge(source, target)
raise HierarchyError(
"Edge '%s->%s' creates a cycle in the hierarchy!" %
(source, target)
)
self.remove_edge(source, target)
# check if the homomorphism is valid
check_homomorphism(
self.node[source].graph,
self.node[target].graph,
mapping,
total=total
)
# check if newly created path commutes with existing shortest paths
self._check_consistency(source, target, mapping)
self.add_edge(source, target)
if attrs is not None:
normalize_attrs(attrs)
self.edge[source][target] = self.graph_typing_cls(
mapping, total, attrs)
self.typing[source][target] = self.edge[source][target].mapping
return
def test_partition(test_graph):
logger = logging.getLogger('partition')
g = test_graph
i = 0
for partition, acyclic in partition_graph(g):
if i == 0:
expected_nodes = {'5', '4', '3', '2', 'r_', 're_'}
expected_edges = {
("5", "2"),
("5", "re_"),
("4", "5"),
("2", "3"),
("2", "re_"),
("3", "4"),
("3", "5"),
("r_", "2")
}
assert set(partition.nodes()) == expected_nodes
assert set(partition.edges()) == expected_edges
elif i == 1:
expected_nodes = {'8', '7', '6', '1', 'r_', 're_'}
expected_edges = {
("7", "8"),
("6", "7"),
("1", "6"),
("1", "re_"),
("r_", "7"),
("r_", "6"),
("r_", "1"),
("8", "re_")
}
assert set(partition.nodes()) == expected_nodes
assert set(partition.edges()) == expected_edges
else:
raise ValueError("Unexpected partition, got more than 2")
logger.debug("%s", partition.nodes())
logger.debug("%s", partition.edges())
assert networkx.is_directed_acyclic_graph(partition) == acyclic
i += 1
def build(self, debug=False):
"""Returns a OpenMDAO Group from the variable graph.
Parameters
----------
debug : bool
True to print debug messages.
Returns
-------
grp : omdao.Group
the OpenMDAO group that computes all variables.
input_bounds : dict[str, any]
a dictionary from input variable name to (min, max, ndim) tuple.
"""
input_bounds = {}
ndim_dict = {}
if not nx.is_directed_acyclic_graph(self._g):
raise Exception('Dependency loop detected')
grp = omdao.Group()
prom = ['*']
for var in nx.topological_sort(self._g):
nattrs = self._g.node[var]
ndim = nattrs['ndim']
ndim_dict[var] = ndim
if self._g.in_degree(var) == 0:
if debug:
# input variable
print('Input variable: %s' % var)
# range checking
vmin, vmax = nattrs['min'], nattrs['max']
veq = nattrs.get('equals', None)
if vmin > vmax:
raise Exception('Variable %s input range not valid.' % var)
input_bounds[var] = veq, vmin, vmax, ndim
else:
init_vals = {par: np.zeros(ndim_dict[par]) for par in self._g.predecessors_iter(var)}
comp_name = 'comp__%s' % var
if 'expr' in nattrs:
eqn = '{}={}'.format(var, nattrs['expr'])
init_vals[var] = np.zeros(ndim)
# noinspection PyTypeChecker
grp.add(comp_name, omdao.ExecComp(eqn, **init_vals), promotes=prom)
elif 'fun_list' in nattrs:
params = nattrs['params']
fun_list = nattrs['fun_list']
vec_params = nattrs['vec_params']
comp = VecFunComponent(var, fun_list, params, vector_params=vec_params)
# noinspection PyTypeChecker
grp.add(comp_name, comp, promotes=prom)
else:
raise Exception('Unknown attributes: {}'.format(nattrs))
return grp, input_bounds
generate_random_dag.py 文件源码
项目:breaking_cycles_in_noisy_hierarchies
作者: zhenv5
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def gnm_random_graph(n, m, seed=None, directed=True):
"""Return the random graph G_{n,m}.
Produces a graph picked randomly out of the set of all graphs
with n nodes and m edges.
Parameters
----------
n : int
The number of nodes.
m : int
The number of edges.
seed : int, optional
Seed for random number generator (default=None).
directed : bool, optional (default=False)
If True return a directed graph
"""
if directed:
G=nx.DiGraph()
g = nx.DiGraph()
else:
G=nx.Graph()
g = nx.Graph()
G.add_nodes_from(range(n))
G.name="gnm_random_graph(%s,%s)"%(n,m)
if seed is not None:
random.seed(seed)
if n==1:
return G
max_edges=n*(n-1)
if not directed:
max_edges/=2.0
if m>=max_edges:
return nx.complete_graph(n,create_using=G)
nlist=G.nodes()
edge_count=0
while edge_count < m:
# generate random edge,u,v
u = random.choice(nlist)
v = random.choice(nlist)
if u>=v or G.has_edge(u,v):
continue
else:
G.add_edge(u,v)
edge_count = edge_count+1
permutation = np.random.permutation(n)
#print permutation
new_edges = []
for e in G.edges():
u,v = e
new_edges.append((permutation[u],permutation[v]))
g.add_edges_from(new_edges)
print("is_directed_acyclic_graph: %s" % nx.is_directed_acyclic_graph(g))
return g
def add_typing(self, source, target, mapping,
total=True, attrs=None):
"""Add homomorphism to the hierarchy."""
if source not in self.nodes():
raise HierarchyError(
"Node '%s' is not defined in the hierarchy!" % source)
if target not in self.nodes():
raise HierarchyError(
"Node '%s' is not defined in the hierarchy!" % target)
if (source, target) in self.edges():
raise HierarchyError(
"Edge '%s->%s' already exists in the hierarchy: "
"no muliple edges allowed!" %
(source, target)
)
if not isinstance(self.node[source], GraphNode):
if type(self.node[source]) == RuleNode:
raise HierarchyError(
"Source node is a rule, use `add_rule_typing` "
"method instead!"
)
else:
raise HierarchyError(
"Source of a typing should be a graph, `%s` is provided!" %
type(self.node[source])
)
if not isinstance(self.node[target], GraphNode):
raise HierarchyError(
"Target of a typing should be a graph, `%s` is provided!" %
type(self.node[target])
)
# check no cycles are produced
self.add_edge(source, target)
if not nx.is_directed_acyclic_graph(self):
self.remove_edge(source, target)
raise HierarchyError(
"Edge '%s->%s' creates a cycle in the hierarchy!" %
(source, target)
)
self.remove_edge(source, target)
# check if the homomorphism is valid
check_homomorphism(
self.node[source].graph,
self.node[target].graph,
mapping,
total=total
)
# check if newly created path commutes with existing shortest paths
self._check_consistency(source, target, mapping)
self.add_edge(source, target)
if attrs is not None:
normalize_attrs(attrs)
self.edge[source][target] = self.graph_typing_cls(
mapping, total, attrs)
self.typing[source][target] = self.edge[source][target].mapping
return