def test_system_task_crash_KeyboardInterrupt():
async def ki():
raise KeyboardInterrupt
async def main():
_core.spawn_system_task(ki)
await sleep_forever()
# KI doesn't get wrapped with TrioInternalError
with pytest.raises(KeyboardInterrupt):
_core.run(main)
# This used to fail because checkpoint was a yield followed by an immediate
# reschedule. So we had:
# 1) this task yields
# 2) this task is rescheduled
# ...
# 3) next iteration of event loop starts, runs timeouts
# 4) this task has timed out
# 5) ...but it's on the run queue, so the timeout is queued to be delivered
# the next time that it's blocked.
评论列表
文章目录