def enqueue_coro(self, coro, priority=1):
def future_wrapper(coro, future):
@functools.wraps(coro)
async def inner():
try:
res = await coro
except Exception as e:
future.set_exception(e)
else:
future.set_result(res)
finally:
self._process_next.set()
return inner
def task_cb(future):
if isinstance(self._external_cb, weakref.ReferenceType):
external_cb = self._external_cb()
else:
external_cb = self._external_cb
try:
res = future.result()
if self.results_queue.full():
self.results_queue.get_nowait()
asyncio.ensure_future(self.results_queue.put(res), loop=self.loop)
if external_cb:
external_cb(res, None)
except Exception as e:
if external_cb:
external_cb(None, exc=e)
else:
raise e
f = asyncio.Future()
f.add_done_callback(task_cb)
coro_locals = {key: repr(val) for key, val in coro.cr_frame.f_locals.items()}
self.tasks_queue.put_nowait(self.Task(future_wrapper(coro, f), priority,
coro_locals, clock()))
评论列表
文章目录