def test_create_future_success_explicit_loop(framework):
"""
process events on alternate loop= for create_future later
"""
pytest.importorskip('asyncio')
if txaio.using_twisted:
pytest.skip()
import asyncio
alt_loop = asyncio.new_event_loop()
txa = txaio.with_config(loop=alt_loop)
f = txa.create_future_success('some result')
results = []
f.add_done_callback(lambda r: results.append(r.result()))
# run_once() runs the txaio.config.loop so we shouldn't get any
# results until we spin alt_loop
assert results == []
run_once()
assert results == []
with replace_loop(alt_loop):
run_once()
assert results == ['some result']
评论列表
文章目录