test_redis.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:aioworkers 作者: aioworkers 项目源码 文件源码
def test_queue(loop):
    config = MergeDict(key=str(uuid.uuid4()))
    config['app.redis_pool'] = await aioredis.create_pool(
        ('localhost', 6379), loop=loop)
    context = config
    q = RedisQueue(config, context=context, loop=loop)
    await q.init()
    await q.put(3)
    assert 1 == await q.length()
    assert [b'3'] == await q.list()
    assert b'3' == await q.get()
    await q.put(3)
    assert 1 == await q.length()
    await q.clear()
    assert not await q.length()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号