def _compute_nodes(self, names, raise_exceptions=False):
LOG.debug('Computing nodes {}'.format(list(map(str, names))))
futs = {}
def run(name):
f, executor_name, args, kwds = self._get_func_args_kwds(name)
if executor_name is None:
executor = self.default_executor
else:
executor = self.executor_map[executor_name]
fut = executor.submit(self._eval_node, name, f, args, kwds, raise_exceptions)
futs[fut] = name
computed = set()
for name in names:
node0 = self.dag.node[name]
state = node0[_AN_STATE]
if state == States.COMPUTABLE:
run(name)
while len(futs) > 0:
done, not_done = wait(futs.keys(), return_when=FIRST_COMPLETED)
for fut in done:
name = futs.pop(fut)
node0 = self.dag.node[name]
value, exc, tb, start_dt, end_dt = fut.result()
delta = (end_dt - start_dt).total_seconds()
if exc is None:
self._set_state_and_value(name, States.UPTODATE, value)
node0[_AN_TIMING] = TimingData(start_dt, end_dt, delta)
self._set_descendents(name, States.STALE)
for n in self.dag.successors(name):
logging.debug(str(name) + ' ' + str(n) + ' ' + str(computed))
if n in computed:
raise LoopDetectedException("Calculating {} for the second time".format(name))
self._try_set_computable(n)
node0 = self.dag.node[n]
state = node0[_AN_STATE]
if state == States.COMPUTABLE and n in names:
run(n)
else:
self._set_state_and_value(name, States.ERROR, Error(exc, tb))
self._set_descendents(name, States.STALE)
computed.add(name)
评论列表
文章目录