def test_zqueue(loop, mocker):
config = MergeDict(
key=str(uuid.uuid4()),
format='str',
timeout=0,
)
config['app.redis_pool'] = await aioredis.create_pool(
('localhost', 6379), loop=loop)
context = config
q = RedisZQueue(config, context=context, loop=loop)
await q.init()
await q.put('a', 4)
await q.put('c', 3)
await q.put('b', 2)
await q.put('a', 1)
assert 3 == await q.length()
assert ['a', 'b', 'c'] == await q.list()
assert 3 == await q.length()
assert 'a' == await q.get()
assert ['b', 'c'] == await q.list()
assert 2 == await q.length()
assert 'b' == await q.get()
assert ['c'] == await q.list()
assert 1 == await q.length()
assert 'c' == await q.get()
assert [] == await q.list()
assert not await q.length()
with pytest.raises(TypeError):
with mocker.patch('asyncio.sleep'):
await q.get()
评论列表
文章目录