test_redis.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:aioworkers 作者: aioworkers 项目源码 文件源码
def test_queue_json(loop):
    config = MergeDict(
        key=str(uuid.uuid4()),
        format='json',
    )
    config['app.redis_pool'] = await aioredis.create_pool(
        ('localhost', 6379), loop=loop)
    context = config
    q = RedisQueue(config, context=context, loop=loop)
    await q.init()
    await q.put({'f': 3})
    assert 1 == await q.length()
    assert [{'f': 3}] == await q.list()
    assert {'f': 3} == await q.get()
    await q.put({'f': 3})
    assert 1 == await q.length()
    await q.clear()
    assert not await q.length()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号