test_run.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:trio 作者: python-trio 项目源码 文件源码
def test_system_task_crash_KeyboardInterrupt():
    async def ki():
        raise KeyboardInterrupt

    async def main():
        _core.spawn_system_task(ki)
        await sleep_forever()

    # KI doesn't get wrapped with TrioInternalError
    with pytest.raises(KeyboardInterrupt):
        _core.run(main)


# This used to fail because checkpoint was a yield followed by an immediate
# reschedule. So we had:
# 1) this task yields
# 2) this task is rescheduled
# ...
# 3) next iteration of event loop starts, runs timeouts
# 4) this task has timed out
# 5) ...but it's on the run queue, so the timeout is queued to be delivered
#    the next time that it's blocked.
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号