def get_task_to_run(self, tree):
"""Working from the bottom up, replace each node with a chain to its
descendant, or celery.Group of descendants.
:param tree: Dependancy graph of tasks
:type tree: networkx.DiGraph
:returns: chain to execute
"""
# TODO: This could be more parallel
return chain(*[
maybe_signature(tree.node[name]['task'], self.celery_app)
for name in nx.topological_sort(tree)
])
python类topological_sort()的实例源码
def create_directed_graphs(self):
'''
:return:
'''
self.directed_graphs = np.empty(
(self.no_of_atoms, self.no_of_atoms - 1, 3), dtype=int)
# parse all the atoms one by one and get directed graph to that atom
# as the sink node
for idx in range(self.no_of_atoms):
# get shortest path from the root to all the other atoms and then reverse the edges.
path = nx.single_source_dijkstra_path(self.graph, idx)
G = nx.DiGraph()
for i in range(self.no_of_atoms):
temp = path[i]
temp.reverse()
G.add_path(temp)
# do a topological sort to get a order of atoms with all edges pointing to the root
topological_order = nx.topological_sort(G)
sorted_path = np.empty((self.no_of_atoms - 1, 3))
no_of_incoming_edges = {}
for i in range(self.no_of_atoms - 1):
node = topological_order[i]
edge = (nx.edges(G, node))[0]
if edge[1] in no_of_incoming_edges:
index = no_of_incoming_edges[edge[1]]
no_of_incoming_edges[edge[1]] += 1
else:
index = 0
no_of_incoming_edges[edge[1]] = 1
sorted_path[i, :] = [node, edge[1], index]
self.directed_graphs[idx, :, :] = sorted_path
def dag(recipe_folder, config, packages="*", format='gml', hide_singletons=False):
"""
Export the DAG of packages to a graph format file for visualization
"""
dag, name2recipes = utils.get_dag(utils.get_recipes(recipe_folder, packages), config)
if hide_singletons:
for node in nx.nodes(dag):
if dag.degree(node) == 0:
dag.remove_node(node)
if format == 'gml':
nx.write_gml(dag, sys.stdout.buffer)
elif format == 'dot':
write_dot(dag, sys.stdout)
elif format == 'txt':
subdags = sorted(map(sorted, nx.connected_components(dag.to_undirected())))
subdags = sorted(subdags, key=len, reverse=True)
singletons = []
for i, s in enumerate(subdags):
if len(s) == 1:
singletons += s
continue
print("# subdag {0}".format(i))
subdag = dag.subgraph(s)
recipes = [
recipe for package in nx.topological_sort(subdag)
for recipe in name2recipes[package]]
print('\n'.join(recipes) + '\n')
if not hide_singletons:
print('# singletons')
recipes = [recipe for package in singletons for recipe in
name2recipes[package]]
print('\n'.join(recipes) + '\n')
def _generateDAG(self):
'''
Generate workflow DAG using networkx directed graph implementation.
Return topological ordering of graphs. Note that nx.topological_sort(G)
requires the graph to be acyclic. Cyclic behavior is hard to implement
in practice since jobs are defined by calling specific dictionary elements.
'''
# Instantiate directed graph, add job dependency edges.
G = nx.DiGraph()
for job in self.jobs:
G.add_node(job)
if 'dependsOn' in self.jobs[job]:
for dependency in self.jobs[job]['dependsOn']:
G.add_edge(dependency['jobKey'], self.jobs[job]['jobKey'])
self.dag_graph = G # For printing purposes.
# Sort jobs in graph using topological sort, assigning job buckets.
# Jobs within the same bucket will be kicked off simultaneously.
topology = nx.topological_sort(G)
self.graph = [(0, topology[0])]
for edge in topology[1:]:
try:
self.graph.append((len(nx.shortest_path(G, topology[0], edge)) - 1, edge))
except nx.exception.NetworkXNoPath as error:
self.graph.append((0, edge))
def calculate_account_data(prov_document: provmodel.ProvDocument) -> list:
"""
Transforms a ProvDocument into a list of tuples including:
ProvAgent, list of ProvRelations from agent,
list of ProvElements associated to ProvAgent,
list of Namespaces
:param prov_document: Document to transform
:type prov_document:
:return: List of tuples(ProvAgent, list(), list(), list())
:rtype: list
"""
namespaces = prov_document.get_registered_namespaces()
g = provgraph.prov_to_graph(prov_document=prov_document)
sorted_nodes = topological_sort(g, reverse=True)
agents = list(filter(lambda elem: isinstance(elem, provmodel.ProvAgent), sorted_nodes))
elements = list(filter(lambda elem: not isinstance(elem, provmodel.ProvAgent), sorted_nodes))
# Check on compatibility
if not is_directed_acyclic_graph(g):
raise Exception("Provenance graph is not acyclic")
if isolates(g):
raise Exception("Provenance not compatible with role-based concept. Has isolated Elements")
for element in elements:
if provmodel.ProvAgent not in [type(n) for n in g.neighbors(element)]:
raise Exception(
"Provenance not compatible with role-based concept. Element {} has not relation to any agent".format(
element))
accounts = []
for agent in agents:
# find out-going relations from agent
agent_relations = []
for u, v in g.out_edges(agent):
# Todo check if filter does not left out some info
agent_relations.append(g.get_edge_data(u, v)[0]['relation'])
agent_elements = {}
i = 0
for element in elements:
element_relations = []
if g.has_edge(element, agent):
for u, v in set(g.out_edges(element)):
for relation in g[u][v].values():
element_relations.append(relation['relation'])
agent_elements[i] = {element: element_relations}
i += 1
accounts.append((agent, agent_relations, agent_elements, namespaces))
return accounts
def compose_back(self, input_circuit, wire_map={}):
"""Apply the input circuit to the output of this circuit.
The two bases must be "compatible" or an exception occurs.
A subset of input qubits of the input circuit are mapped
to a subset of output qubits of this circuit.
wire_map[input_qubit_to_input_circuit] = output_qubit_of_self
"""
union_basis = self._make_union_basis(input_circuit)
union_gates = self._make_union_gates(input_circuit)
# Check the wire map for duplicate values
if len(set(wire_map.values())) != len(wire_map):
raise DAGCircuitError("duplicates in wire_map")
add_qregs = self._check_wiremap_registers(wire_map,
input_circuit.qregs,
self.qregs)
for register in add_qregs:
self.add_qreg(register[0], register[1])
add_cregs = self._check_wiremap_registers(wire_map,
input_circuit.cregs,
self.cregs)
for register in add_cregs:
self.add_creg(register[0], register[1])
self._check_wiremap_validity(wire_map, input_circuit.input_map,
self.output_map, input_circuit)
# Compose
self.basis = union_basis
self.gates = union_gates
topological_sort = nx.topological_sort(input_circuit.multi_graph)
for node in topological_sort:
nd = input_circuit.multi_graph.node[node]
if nd["type"] == "in":
# if in wire_map, get new name, else use existing name
m_name = wire_map.get(nd["name"], nd["name"])
# the mapped wire should already exist
assert m_name in self.output_map, \
"wire (%s,%d) not in self" % (m_name[0], m_name[1])
assert nd["name"] in input_circuit.wire_type, \
"inconsistent wire_type for (%s,%d) in input_circuit" \
% (nd["name"][0], nd["name"][1])
elif nd["type"] == "out":
# ignore output nodes
pass
elif nd["type"] == "op":
condition = self._map_condition(wire_map, nd["condition"])
self._check_condition(nd["name"], condition)
m_qargs = list(map(lambda x: wire_map.get(x, x), nd["qargs"]))
m_cargs = list(map(lambda x: wire_map.get(x, x), nd["cargs"]))
self.apply_operation_back(nd["name"], m_qargs, m_cargs,
nd["params"], condition)
else:
assert False, "bad node type %s" % nd["type"]
def compose_front(self, input_circuit, wire_map={}):
"""Apply the input circuit to the input of this circuit.
The two bases must be "compatible" or an exception occurs.
A subset of output qubits of the input circuit are mapped
to a subset of input qubits of
this circuit.
"""
union_basis = self._make_union_basis(input_circuit)
union_gates = self._make_union_gates(input_circuit)
# Check the wire map
if len(set(wire_map.values())) != len(wire_map):
raise DAGCircuitError("duplicates in wire_map")
add_qregs = self._check_wiremap_registers(wire_map,
input_circuit.qregs,
self.qregs)
for r in add_qregs:
self.add_qreg(r[0], r[1])
add_cregs = self._check_wiremap_registers(wire_map,
input_circuit.cregs,
self.cregs)
for r in add_cregs:
self.add_creg(r[0], r[1])
self._check_wiremap_validity(wire_map, input_circuit.output_map,
self.input_map, input_circuit)
# Compose
self.basis = union_basis
self.gates = union_gates
ts = nx.topological_sort(input_circuit.multi_graph, reverse=True)
for n in ts:
nd = input_circuit.multi_graph.node[n]
if nd["type"] == "out":
# if in wire_map, get new name, else use existing name
m_name = wire_map.get(nd["name"], nd["name"])
# the mapped wire should already exist
assert m_name in self.input_map, \
"wire (%s,%d) not in self" % (m_name[0], m_name[1])
assert nd["name"] in input_circuit.wire_type, \
"inconsistent wire_type for (%s,%d) in input_circuit" \
% (nd["name"][0], nd["name"][1])
elif nd["type"] == "in":
# ignore input nodes
pass
elif nd["type"] == "op":
condition = self._map_condition(wire_map, nd["condition"])
self._check_condition(nd["name"], condition)
m_qargs = list(map(lambda x: wire_map.get(x, x), nd["qargs"]))
m_cargs = list(map(lambda x: wire_map.get(x, x), nd["cargs"]))
self.apply_operation_front(nd["name"], m_qargs, m_cargs,
nd["params"], condition)
else:
assert False, "bad node type %s" % nd["type"]
def compile(cls, source_net, compiled_net):
"""Add observed nodes to the computation graph.
Parameters
----------
source_net : nx.DiGraph
compiled_net : nx.DiGraph
Returns
-------
compiled_net : nx.Digraph
"""
logger.debug("{} compiling...".format(cls.__name__))
observable = []
uses_observed = []
for node in nx.topological_sort(source_net):
state = source_net.node[node]
if state.get('_observable'):
observable.append(node)
cls.make_observed_copy(node, compiled_net)
elif state.get('_uses_observed'):
uses_observed.append(node)
obs_node = cls.make_observed_copy(node, compiled_net, args_to_tuple)
# Make edge to the using node
compiled_net.add_edge(obs_node, node, param='observed')
else:
continue
# Copy the edges
if not state.get('_stochastic'):
obs_node = observed_name(node)
for parent in source_net.predecessors(node):
if parent in observable:
link_parent = observed_name(parent)
else:
link_parent = parent
compiled_net.add_edge(link_parent, obs_node, source_net[parent][node].copy())
# Check that there are no stochastic nodes in the ancestors
for node in uses_observed:
# Use the observed version to query observed ancestors in the compiled_net
obs_node = observed_name(node)
for ancestor_node in nx.ancestors(compiled_net, obs_node):
if '_stochastic' in source_net.node.get(ancestor_node, {}):
raise ValueError("Observed nodes must be deterministic. Observed "
"data depends on a non-deterministic node {}."
.format(ancestor_node))
return compiled_net
def build(self, debug=False):
"""Returns a OpenMDAO Group from the variable graph.
Parameters
----------
debug : bool
True to print debug messages.
Returns
-------
grp : omdao.Group
the OpenMDAO group that computes all variables.
input_bounds : dict[str, any]
a dictionary from input variable name to (min, max, ndim) tuple.
"""
input_bounds = {}
ndim_dict = {}
if not nx.is_directed_acyclic_graph(self._g):
raise Exception('Dependency loop detected')
grp = omdao.Group()
prom = ['*']
for var in nx.topological_sort(self._g):
nattrs = self._g.node[var]
ndim = nattrs['ndim']
ndim_dict[var] = ndim
if self._g.in_degree(var) == 0:
if debug:
# input variable
print('Input variable: %s' % var)
# range checking
vmin, vmax = nattrs['min'], nattrs['max']
veq = nattrs.get('equals', None)
if vmin > vmax:
raise Exception('Variable %s input range not valid.' % var)
input_bounds[var] = veq, vmin, vmax, ndim
else:
init_vals = {par: np.zeros(ndim_dict[par]) for par in self._g.predecessors_iter(var)}
comp_name = 'comp__%s' % var
if 'expr' in nattrs:
eqn = '{}={}'.format(var, nattrs['expr'])
init_vals[var] = np.zeros(ndim)
# noinspection PyTypeChecker
grp.add(comp_name, omdao.ExecComp(eqn, **init_vals), promotes=prom)
elif 'fun_list' in nattrs:
params = nattrs['params']
fun_list = nattrs['fun_list']
vec_params = nattrs['vec_params']
comp = VecFunComponent(var, fun_list, params, vector_params=vec_params)
# noinspection PyTypeChecker
grp.add(comp_name, comp, promotes=prom)
else:
raise Exception('Unknown attributes: {}'.format(nattrs))
return grp, input_bounds
def wider(self, model, config):
topo_nodes = nx.topological_sort(model.graph)
names = [node.name
for node in topo_nodes
if
node.type == 'Conv2D' or node.type == 'Conv2D_Pooling' or node.type == 'Group'] # support group layer to wider
#node.type == 'Conv2D' or node.type == 'Conv2D_Pooling']
max_iter = 100
for i in range(max_iter + 1):
if i == max_iter:
logger.info('can\'t find a suitable layer to apply wider operation,return origin model')
return model, False
# random choose a layer to wider, except last conv layer
choice = names[np.random.randint(0, len(names) - 1)]
cur_node = model.graph.get_nodes(choice)[0]
next_nodes = model.graph.get_nodes(choice, next_layer=True, last_layer=False)
if 'Conv2D' in [node.type for node in next_nodes] or 'Conv2D_Pooling' in [node.type for node in next_nodes]:
break
else:
continue
cur_width = cur_node.config['filters']
# for test
# enlarge the max_cur_width
#max_cur_width = (int((config.model_max_conv_width - config.model_min_conv_width) * cur_node.depth / config.model_max_depth) \
# + config.model_min_conv_width) * 5
# for test
max_cur_width = 1024
width_ratio = np.random.rand()
new_width = int(cur_width + width_ratio * (max_cur_width - cur_width))
if cur_node.type == 'Group':
# make sure that new_width % group_num == 0
new_width = new_width // cur_node.config['group_num'] * cur_node.config['group_num']
if new_width <= cur_width:
logger.info('{} layer\'s width up to limit!'.format(choice))
return model, False
logger.info('choose {} to wider'.format(choice))
if cur_node.type == 'Group':
return self.wider_group_conv2d(model, layer_name=choice, new_width=new_width, config=config), True
else:
return self.wider_conv2d(model, layer_name=choice, new_width=new_width, config=config), True
def _import_daggen(line_iter):
_NODE_TYPES = {"ROOT", "END", "COMPUTATION", "TRANSFER"}
result = nx.DiGraph()
node_mapper = lambda nid: "task_%d" % nid
nodes = {}
skip = True
for line in line_iter:
line = line.strip()
if line.startswith("NODE_COUNT"):
skip=False
continue
if skip or not line:
continue
node_parts = line.split(" ")
assert len(node_parts) == 6
magic, nodeid, children, nodetype, cost, parallel_ratio = node_parts
assert magic == "NODE"
nodeid = int(nodeid)
children = list(map(int, children.split(","))) if children != "-" else []
assert nodetype in _NODE_TYPES
cost = float(cost)
# unused_for_now
parallel_ratio = float(parallel_ratio)
nodes[nodeid] = (nodetype, children, cost)
for nodeid, (nodetype, _, cost) in nodes.items():
if nodetype != "TRANSFER":
result.add_node(node_mapper(nodeid), weight=cost)
for nodeid, (nodetype, children, _) in nodes.items():
if nodetype == "TRANSFER":
continue
for childid in children:
childtype, grandchildren, transfercost = nodes[childid]
if childtype == "TRANSFER":
assert len(grandchildren) == 1
destination = grandchildren[0]
weight = transfercost
else:
assert nodetype == "ROOT" or childtype=="END"
destination = childid
# TODO: Should be 0.
#
# Kludge to force order in 3rd-party HEFT implementation
# (nodes connected by edges with zero weight get mixed
# in HEFT priority list and violate precedence constraints)
#
# Can be removed as I can fix this BS in my HEFT
weight = 1.
result.add_edge(node_mapper(nodeid), node_mapper(destination), weight=weight)
node_order = nx.topological_sort(result)
return nx.relabel_nodes(result, {
node_order[0]: "root",
node_order[-1]: "end"
})
def get_tasks_aest_alst(cls, nxgraph, platform_model):
"""
Return AEST and ALST of tasks.
Args:
nxgraph: full task graph as networkx.DiGraph
platform_model: cscheduling.PlatformModel object
Returns:
tuple containg 2 dictionaries
aest: task->aest_value
alst: task->alst_value
"""
mean_speed = platform_model.mean_speed
mean_bandwidth = platform_model.mean_bandwidth
mean_latency = platform_model.mean_latency
topological_order = networkx.topological_sort(nxgraph)
# Average execution cost
aec = {task: float(task.amount) / mean_speed for task in nxgraph}
# Average earliest start time
aest = {}
# TODO: Check several roots and ends!
root = topological_order[0]
end = topological_order[-1]
aest[root] = 0.
for task in topological_order:
parents = nxgraph.pred[task]
if not parents:
aest[task] = 0
continue
aest[task] = max([
aest[parent] + aec[parent] + (nxgraph[parent][task]["weight"] / mean_bandwidth + mean_latency)
for parent in parents
])
topological_order.reverse()
# Average latest start time
alst = {}
alst[end] = aest[end]
for task in topological_order:
if not nxgraph[task]:
alst[task] = aest[task]
continue
alst[task] = min([
alst[child] - (edge["weight"] / mean_bandwidth + mean_latency)
for child, edge in nxgraph[task].items()
]) - aec[task]
return aest, alst
def main(directory):
log_reader = git_log_processing.GitLogReader(directory)
log = log_reader.get_log_information()
gitexplorer_database = GitExplorerBase.get_gitexplorer_database()
gitexplorer_database.commit_collection.drop()
gitexplorer_database.commit_collection.insert_many(log)
queries.AggregatorRegistry.load('gitexplorer.queries.authors_per_file',
'gitexplorer.queries.commits_by_datetime',
'gitexplorer.queries.commits_by_filestats',
'gitexplorer.queries.commits_per_author',
'gitexplorer.queries.queries_per_commit')
aggregations = list(map(queries.AggregatorRegistry.get,
['authors_per_file_path',
'commits_by_day_of_week',
'commits_by_hour_of_day',
'additions_deletions_lines_commits_by_file_path',
'commits_per_author',
'additions_deletions_lines_modifications_per_commit',
'average_additions_deletions_lines_modifications_per_commit',
'additions_deletions_lines_modifications_commits_by_date',
'average_additions_deletions_lines_modifications_commits_by_date',
]))
dependencies = nx.DiGraph()
for aggregation in aggregations:
provides = aggregation.provides()
dependencies.add_edge(provides, aggregation.requires())
sorted_dependencies = nx.topological_sort(dependencies, reverse=True)
print(sorted_dependencies)
for dependency in sorted_dependencies:
for aggregation in aggregations:
if(aggregation.name == dependency):
aggregation().run()
nx.draw(dependencies, with_labels=True)
plt.show()