test_batched_timers_aio.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:deb-python-txaio 作者: openstack 项目源码 文件源码
def test_batched_successful_call_explicit_loop(framework_aio):
    '''
    batched calls really happen in batches
    '''
    # Trollius doesn't come with this, so won't work on py2
    pytest.importorskip('asyncio.test_utils')
    from asyncio.test_utils import TestLoop

    def time_gen():
        yield
        yield
    new_loop = TestLoop(time_gen)
    calls = []

    def foo(*args, **kw):
        calls.append((args, kw))

    txa = txaio.with_config(loop=new_loop)

    batched = txa.make_batched_timer(5)

    batched.call_later(1, foo, "first call")
    new_loop.advance_time(2.0)
    new_loop._run_once()
    assert len(calls) == 1
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号