def test_broken_abort():
async def main():
# These yields are here to work around an annoying warning -- we're
# going to crash the main loop, and if we (by chance) do this before
# the run_sync_soon task runs for the first time, then Python gives us
# a spurious warning about it not being awaited. (I mean, the warning
# is correct, but here we're testing our ability to deliver a
# semi-meaningful error after things have gone totally pear-shaped, so
# it's not relevant.) By letting the run_sync_soon_task run first, we
# avoid the warning.
await _core.checkpoint()
await _core.checkpoint()
with _core.open_cancel_scope() as scope:
scope.cancel()
# None is not a legal return value here
await _core.wait_task_rescheduled(lambda _: None)
with pytest.raises(_core.TrioInternalError):
_core.run(main)
# Because this crashes, various __del__ methods print complaints on
# stderr. Make sure that they get run now, so the output is attached to
# this test.
gc_collect_harder()
评论列表
文章目录