def test_queue_json(loop):
config = MergeDict(
key=str(uuid.uuid4()),
format='json',
)
config['app.redis_pool'] = await aioredis.create_pool(
('localhost', 6379), loop=loop)
context = config
q = RedisQueue(config, context=context, loop=loop)
await q.init()
await q.put({'f': 3})
assert 1 == await q.length()
assert [{'f': 3}] == await q.list()
assert {'f': 3} == await q.get()
await q.put({'f': 3})
assert 1 == await q.length()
await q.clear()
assert not await q.length()
评论列表
文章目录