def queue_purge(self, queue=None):
'''
Purge the consumer's queue.
The ``queue`` parameter exists only for compatibility and is
ignored.
'''
# Use a script to make the operation atomic
lua_code = b'''
local routing_key = KEYS[1]
local message_key = ARGV[1]
local count = 0
while true do
local s = redis.call("lpop", routing_key)
if s == false then
break
end
local value = cjson.decode(s)
local id = value[message_key]
local persistance_key = routing_key .. ":" .. id
redis.call("del", persistance_key)
count = count + 1
end
return count
'''
script = self.redis.register_script(lua_code)
return script(keys=[self.routing_key], args=[self.message_key])
评论列表
文章目录