base.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:asyncio-mongo-reflection 作者: isanich 项目源码 文件源码
def enqueue_coro(self, coro, priority=1):

        def future_wrapper(coro, future):
            @functools.wraps(coro)
            async def inner():
                try:
                    res = await coro
                except Exception as e:
                    future.set_exception(e)
                else:
                    future.set_result(res)
                finally:
                    self._process_next.set()
            return inner

        def task_cb(future):
            if isinstance(self._external_cb, weakref.ReferenceType):
                external_cb = self._external_cb()
            else:
                external_cb = self._external_cb

            try:
                res = future.result()

                if self.results_queue.full():
                    self.results_queue.get_nowait()
                asyncio.ensure_future(self.results_queue.put(res), loop=self.loop)

                if external_cb:
                    external_cb(res, None)
            except Exception as e:
                if external_cb:
                    external_cb(None, exc=e)
                else:
                    raise e

        f = asyncio.Future()
        f.add_done_callback(task_cb)

        coro_locals = {key: repr(val) for key, val in coro.cr_frame.f_locals.items()}
        self.tasks_queue.put_nowait(self.Task(future_wrapper(coro, f), priority,
                                              coro_locals, clock()))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号