def test_explicit_loop_threaded(event_loop):
async with base.CleanModel() as model:
model_name = model.info.name
new_loop = asyncio.new_event_loop()
with ThreadPoolExecutor(1) as executor:
f = executor.submit(
new_loop.run_until_complete,
_deploy_in_loop(new_loop, model_name, model._connector.jujudata))
f.result()
await model._wait_for_new('application', 'ubuntu')
assert 'ubuntu' in model.applications
评论列表
文章目录