computeengine.py 文件源码

python
阅读 32 收藏 0 点赞 0 评论 0

项目:loman 作者: janusassetallocation 项目源码 文件源码
def _compute_nodes(self, names, raise_exceptions=False):
        LOG.debug('Computing nodes {}'.format(list(map(str, names))))

        futs = {}

        def run(name):
            f, executor_name, args, kwds = self._get_func_args_kwds(name)
            if executor_name is None:
                executor = self.default_executor
            else:
                executor = self.executor_map[executor_name]
            fut = executor.submit(self._eval_node, name, f, args, kwds, raise_exceptions)
            futs[fut] = name

        computed = set()

        for name in names:
            node0 = self.dag.node[name]
            state = node0[_AN_STATE]
            if state == States.COMPUTABLE:
                run(name)

        while len(futs) > 0:
            done, not_done = wait(futs.keys(), return_when=FIRST_COMPLETED)
            for fut in done:
                name = futs.pop(fut)
                node0 = self.dag.node[name]
                value, exc, tb, start_dt, end_dt = fut.result()
                delta = (end_dt - start_dt).total_seconds()
                if exc is None:
                    self._set_state_and_value(name, States.UPTODATE, value)
                    node0[_AN_TIMING] = TimingData(start_dt, end_dt, delta)
                    self._set_descendents(name, States.STALE)
                    for n in self.dag.successors(name):
                        logging.debug(str(name) + ' ' + str(n) + ' ' + str(computed))
                        if n in computed:
                            raise LoopDetectedException("Calculating {} for the second time".format(name))
                        self._try_set_computable(n)
                        node0 = self.dag.node[n]
                        state = node0[_AN_STATE]
                        if state == States.COMPUTABLE and n in names:
                            run(n)
                else:
                    self._set_state_and_value(name, States.ERROR, Error(exc, tb))
                    self._set_descendents(name, States.STALE)
                computed.add(name)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号