conftest.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:asynccmd 作者: valentinmk 项目源码 文件源码
def loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(None)
    executor = concurrent.futures.ThreadPoolExecutor()
    loop.set_default_executor(executor)
    yield loop

    if loop.is_closed:
        pending = asyncio.Task.all_tasks(loop=loop)
        for task in pending:
            task.cancel()
            # Now we should await task to execute it's cancellation.
            # Cancelled task raises asyncio.CancelledError that we can suppress
            with suppress(asyncio.CancelledError):
                loop.run_until_complete(task)
        executor.shutdown()
        loop.close()

    gc.collect()
    asyncio.set_event_loop(None)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号