test_redis.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:aioworkers 作者: aioworkers 项目源码 文件源码
def test_zqueue(loop, mocker):
    config = MergeDict(
        key=str(uuid.uuid4()),
        format='str',
        timeout=0,
    )
    config['app.redis_pool'] = await aioredis.create_pool(
        ('localhost', 6379), loop=loop)
    context = config
    q = RedisZQueue(config, context=context, loop=loop)
    await q.init()
    await q.put('a', 4)
    await q.put('c', 3)
    await q.put('b', 2)
    await q.put('a', 1)
    assert 3 == await q.length()
    assert ['a', 'b', 'c'] == await q.list()
    assert 3 == await q.length()
    assert 'a' == await q.get()
    assert ['b', 'c'] == await q.list()
    assert 2 == await q.length()
    assert 'b' == await q.get()
    assert ['c'] == await q.list()
    assert 1 == await q.length()
    assert 'c' == await q.get()
    assert [] == await q.list()
    assert not await q.length()

    with pytest.raises(TypeError):
        with mocker.patch('asyncio.sleep'):
            await q.get()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号