def bag_of_elementary_cycles(graph):
""" Bag of elementary cycles """
bag = {}
for cycle in nx.simple_cycles(graph):
ns = map(lambda x: node_label(graph.node[x]), cycle)
# Determine smallest label and rotate cycle
i = min(enumerate(ns), key=lambda x: x[1])[0]
ns.extend(ns[:i])
ns[:i] = []
label = '-'.join(ns)
if label not in bag:
bag[label] = 0
bag[label] += 1
return bag
python类simple_cycles()的实例源码
def cycle_detect(G):
# G = nx.DiGraph object
cycle_list = list(nx.simple_cycles(G))
print '\n\tCycles detected for individuals: '
for i in cycle_list:
print '\t', i[0]
rmCyNode = raw_input('\n\t> Remove nodes? (y/n): ')
print rmCyNode
if rmCyNode == 'y':
for i in cycle_list:
G.remove_node(i[0])
print '\n\tNodes removed with success.'
print '\n\tProceeding with the algorithm.'
elif rmCyNode == 'n':
print '\n\tCorrect pedigree errors before proceeding..'
print '\n\tClosing PedWork.py session...'
print '\n\tExit > OK!'
sys.exit()
else:
print '\n\tPlease enter y/n.'
sys.exit()
return G
def embed(self, G, rng):
if G.order() < self.cycle_length:
return np.array([0.0], dtype = "f")
succ = 0
max_cycle = sp.binom(G.order(), self.cycle_length)
for _ in range(self.sample_size):
us = rng.choice(G.nodes(), self.cycle_length)
H = G.subgraph(us.tolist())
for c in nx.simple_cycles(H):
if len(c) == self.cycle_length:
succ += 1
break
val = [max_cycle * (succ / self.sample_size)]
return np.array(val, dtype = "f")
def embed(self, G, rng):
if G.order() < self.cycle_length:
return np.array([0.0], dtype = "f")
succ, samples = 0, 0
max_cycle = sp.binom(G.order(), self.cycle_length)
for _ in range(self.sample_cap):
us = rng.choice(G.nodes(), self.cycle_length)
H = G.subgraph(us.tolist())
samples += 1
for c in nx.simple_cycles(H):
if len(c) == self.cycle_length:
succ += 1
break
if succ >= self.successes:
break
return np.array([max_cycle * (succ / samples)], dtype = "f")
def _feature_loopcount(self, f_ea):
'''
Generate a block graph, then get loopcount via DiGraph.number_of_selfloops()
prior feature: Null
'''
DEBUG_PRINT( 'feature_loopcount' + hex(f_ea))
loopcount = 0
mloops = set()
# loops = map(lambda loop: sorted(loop),nx.simple_cycles(self.ftable["dg"]))
# sorted too slow, use min instead
self.loops = {}
self.loops = simple_cycles(self.ftable["dg"])
# if loops is None:
# return 0
# # mloops = map(lambda loop : min(loop), loops)
# # loopcount = len(sets(mloops))
# for loop in loops:
# mloop = min(loop)
# mloops.add(mloop)
# loopcount = len(mloops)
loopcount = len(self.loops)
return loopcount
def expand(self):
assert(self.state == TransactionState.INIT)
(expanded, missing) = self._expand_depedencies(
self.local_repo,
self.repo,
self.targets,
exclude=self.removes
)
if len(missing) > 0:
raise MissingDependencyError(missing)
self.depend_G = super().packages_to_graph(expanded)
try:
cycle = next(nx.simple_cycles(self.depend_G))
except StopIteration:
pass
else:
raise TransactionCycleError(cycle)
self._sort_deps_to_targets()
# Find all the packages that are upgrades rather than installs
for package in expanded:
if not package.is_local:
# Search for just the name to check if we have any version
# locally
q = Query(package.name)
local_package = self.local_repo.find_package(q)
# If we have something we want to remove it first, we call that
# an "upgrade"
if local_package is None:
continue
self.removes.append(local_package)
conflicts = super()._find_conflicts(self.installs, self.local_repo)
if len(conflicts) > 0:
# There were at least some conflicts
raise ConflictError(conflicts)
self.state = TransactionState.EXPANDED
def get_cycles_by_value(self, edge_type: EdgeType, quote_type: QuoteType) -> Dict[float, List[str]]:
dg = self.get_network(NetworkType.price, edge_type, quote_type)
weights = get_edge_attributes(dg, 'weight')
cycle_vals = {}
for cycle in simple_cycles(dg):
# sort the currencies in cycle for long term sanity
best_curr = max(cycle, key=lambda x: Currency[x].value)
best_curr_ind = cycle.index(best_curr)
cycle = [cycle[best_curr_ind]] + cycle[(best_curr_ind + 1):] + cycle[:best_curr_ind]
cycle.append(cycle[0])
prodw = [float(weights[(cycle[i], cycle[i + 1])]) for i in range(len(cycle) - 1)]
prodw = prod(prodw)
cycle_vals[prodw] = cycle
return cycle_vals
def loops(func):
'''Return number of loops at function ``func``'''
import networkx as nx
fn = by(func)
g = blocks.nx(fn)
return len(list(nx.simple_cycles(g)))
# FIXME: document this
#def refs(func, member):
# xl, fn = idaapi.xreflist_t(), by(func)
# idaapi.build_stkvar_xrefs(xl, fn, member.ptr)
# x.ea, x.opnum, x.type
# ref_types = {
# 0 : 'Data_Unknown',
# 1 : 'Data_Offset',
# 2 : 'Data_Write',
# 3 : 'Data_Read',
# 4 : 'Data_Text',
# 5 : 'Data_Informational',
# 16 : 'Code_Far_Call',
# 17 : 'Code_Near_Call',
# 18 : 'Code_Far_Jump',
# 19 : 'Code_Near_Jump',
# 20 : 'Code_User',
# 21 : 'Ordinary_Flow'
# }
# return [(x.ea,x.opnum) for x in xl]
def loops(func):
'''Return number of loops at function ``func``'''
import networkx as nx
fn = by(func)
g = blocks.nx(fn)
return len(list(nx.simple_cycles(g)))
# FIXME: document this
#def refs(func, member):
# xl, fn = idaapi.xreflist_t(), by(func)
# idaapi.build_stkvar_xrefs(xl, fn, member.ptr)
# x.ea, x.opnum, x.type
# ref_types = {
# 0 : 'Data_Unknown',
# 1 : 'Data_Offset',
# 2 : 'Data_Write',
# 3 : 'Data_Read',
# 4 : 'Data_Text',
# 5 : 'Data_Informational',
# 16 : 'Code_Far_Call',
# 17 : 'Code_Near_Call',
# 18 : 'Code_Far_Jump',
# 19 : 'Code_Near_Jump',
# 20 : 'Code_User',
# 21 : 'Ordinary_Flow'
# }
# return [(x.ea,x.opnum) for x in xl]
def loops(func):
'''Return number of loops at function ``func``'''
import networkx as nx
fn = by(func)
g = blocks.nx(fn)
return len(list(nx.simple_cycles(g)))
# FIXME: document this
#def refs(func, member):
# xl, fn = idaapi.xreflist_t(), by(func)
# idaapi.build_stkvar_xrefs(xl, fn, member.ptr)
# x.ea, x.opnum, x.type
# ref_types = {
# 0 : 'Data_Unknown',
# 1 : 'Data_Offset',
# 2 : 'Data_Write',
# 3 : 'Data_Read',
# 4 : 'Data_Text',
# 5 : 'Data_Informational',
# 16 : 'Code_Far_Call',
# 17 : 'Code_Near_Call',
# 18 : 'Code_Far_Jump',
# 19 : 'Code_Near_Jump',
# 20 : 'Code_User',
# 21 : 'Ordinary_Flow'
# }
# return [(x.ea,x.opnum) for x in xl]
def _remove_cycles(self):
"""
Really hacky way to remove cycles in hierarchies (wtf).
"""
G = self.make_graph()
cycles = list(networkx.simple_cycles(G))
if not cycles:
return False
else:
cycles[0][0].looped = True
cycles[0][0].sources[:] = [ ]
return True
def _compile_node_creator_list(self):
graph = self._graph
cycles = list(nx.simple_cycles(graph))
if len(cycles):
raise ValueError("Found cycles in dependencies "+str(cycles))
else:
nodes = list(nx.dfs_postorder_nodes(graph))
nodes.remove(self._ROOT) # exclude root
creators = []
for n in nodes:
creator = graph.node[n].get('creator')
creators.append((n, creator))
return creators
def valid_graph(self, g):
cycles = [a for a in nx.simple_cycles(g)]
if len(cycles) > 0:
return False
return True
def build_tree(self, tasks):
"""Accepts any number of tasks as returned by _get_pipeline.
:param tasks: dictionary of str:info where str is the name of the task, info is from the registry
:returns: Graph containing each node connected by dependencies, "after: ALL" nodes will be ignored
:rtype: networkx.DiGraph
:raises: :exc:`DependencyError`
"""
all_task_names = [task for tasks_subset in self.registry.values() for task in tasks_subset]
# Find dependencies - directed graph of node names
tree = nx.DiGraph()
# Add nodes
for name, info in tasks.items():
# TODO: doing this twice sucks
if info['after'] is ALL:
# ignore these
continue
tree.add_node(name, info=info)
# Add edges
for name, info in tasks.items():
if info['after'] is ALL:
# ignore these
continue
for req in info['after']:
if req not in all_task_names:
msg = '"{}" pipeline element was not found, but it is declared as dependency of the pipeline "{}" with arguments "{}"'
raise DependencyError(msg.format(req, name, info))
if req in tree: # don't add an edge if dependency is not part of the current set of tasks
tree.add_edge(req, name)
# Not as useful as it originally seemed
# tree = prune_edges(tree)
# Check for circular dependencies
try:
cycle = nx.simple_cycles(tree).next()
raise DependencyError('Circular dependencies detected: {}'.format(cycle))
except StopIteration:
# Good - didn't want any cycles
pass
# Joins (merge multiple tasks) have more than one edge in
# joins = [n for n, d in tree.in_degree_iter() if d > 1]
# Don't support joins right now, one reducer at the end of the chain
# if joins:
# raise DependencyError('Multiple after values not currently supported joins="{}"'.format(joins))
# TODO - even with joins this could be a challenge
# Can't handle "N" shapes, so check for those
# Descendants of forks, cannot join from outside.
return tree