def test_exception_on_closing(make_scheduler, loop):
exc_handler = mock.Mock()
scheduler = await make_scheduler(exception_handler=exc_handler)
fut = asyncio.Future()
exc = RuntimeError()
async def coro():
fut.set_result(None)
raise exc
job = await scheduler.spawn(coro())
await fut
await scheduler.close()
assert job.closed
expect = {'message': 'Job processing failed',
'job': job,
'exception': exc}
if loop.get_debug():
expect['source_traceback'] = mock.ANY
exc_handler.assert_called_with(scheduler, expect)
评论列表
文章目录