def Queue(self, queue_type="python_queue", name=None, **kwargs): #???????????
if queue_type not in ["python_queue", "redis_queue"]: #????
raise Exception(queue_type + " wrong")
if queue_type == "redis_queue" and redis_enable == False:
raise RedisImportException
if name in self.queue_dict.keys(): #???????
return self.queue_dict[name]
else: #????????
if not name: #???name?????????
max_name_id = 0
if self.queue_name_counter.has_key(queue_type):
name = queue_type + str(self.queue_name_counter[queue_type] + 1)
self.queue_name_counter[queue_type] += 1
else:
name = queue_type + "0"
self.queue_name_counter[queue_type] = 0
#???????????
if queue_type == "python_queue":
queue = PythonQueue(name, **kwargs)
elif queue_type == "redis_queue":
queue = RedisQ(name, **kwargs)
self.queue_dict[name] = queue
return queue
python类keys()的实例源码
def resubmit_jobs():
'''
Examines the fetch and gather queues for items that are suspiciously old.
These are removed from the queues and placed back on them afresh, to ensure
the fetch & gather consumers are triggered to process it.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
return
redis = get_connection()
# fetch queue
harvest_object_pending = redis.keys(get_fetch_routing_key() + ':*')
for key in harvest_object_pending:
date_of_key = datetime.datetime.strptime(redis.get(key),
"%Y-%m-%d %H:%M:%S.%f")
# 3 minutes for fetch and import max
if (datetime.datetime.now() - date_of_key).seconds > 180:
redis.rpush(get_fetch_routing_key(),
json.dumps({'harvest_object_id': key.split(':')[-1]})
)
redis.delete(key)
# gather queue
harvest_jobs_pending = redis.keys(get_gather_routing_key() + ':*')
for key in harvest_jobs_pending:
date_of_key = datetime.datetime.strptime(redis.get(key),
"%Y-%m-%d %H:%M:%S.%f")
# 3 hours for a gather
if (datetime.datetime.now() - date_of_key).seconds > 7200:
redis.rpush(get_gather_routing_key(),
json.dumps({'harvest_job_id': key.split(':')[-1]})
)
redis.delete(key)
def __init__(self, redis, routing_key):
self.redis = redis
# Routing keys are constructed with {site-id}:{message-key}, eg:
# default:harvest_job_id or default:harvest_object_id
self.routing_key = routing_key
# Message keys are harvest_job_id for the gather consumer and
# harvest_object_id for the fetch consumer
self.message_key = routing_key.split(':')[-1]
def queue_purge(self, queue=None):
'''
Purge the consumer's queue.
The ``queue`` parameter exists only for compatibility and is
ignored.
'''
# Use a script to make the operation atomic
lua_code = b'''
local routing_key = KEYS[1]
local message_key = ARGV[1]
local count = 0
while true do
local s = redis.call("lpop", routing_key)
if s == false then
break
end
local value = cjson.decode(s)
local id = value[message_key]
local persistance_key = routing_key .. ":" .. id
redis.call("del", persistance_key)
count = count + 1
end
return count
'''
script = self.redis.register_script(lua_code)
return script(keys=[self.routing_key], args=[self.message_key])
def dump_all(redis=r):
keys = redis.keys('*')
pairs = {}
for key in keys:
type = redis.type(key)
val = redis.get(key)
try:
pairs[key] = eval(val)
except Exception as e:
print pairs, key, val, e
return pairs
def del_all(redis=r):
keys = redis.keys('*')
for k in keys:
print 'Deleting:', k, 'result is', redis.delete(k)
def main():
# del_all()
# print json.dumps(dump_all(), indent=4)
keys = r.keys('*')
print keys
print len(keys)
def test_aredis(i):
start = time.time()
client = aredis.StrictRedis(host=HOST)
res = None
for i in range(i):
res = await client.keys('*')
print(time.time() - start)
return res
def test_asyncio_redis(i):
connection = await asyncio_redis.Connection.create(host=HOST, port=6379)
start = time.time()
res = None
for i in range(i):
res = await connection.keys('*')
print(time.time() - start)
connection.close()
return res
def test_conn(i):
start = time.time()
client = redis.StrictRedis(host=HOST)
res = None
for i in range(i):
res = client.keys('*')
print(time.time() - start)
return res
def test_aioredis(i, loop):
start = time.time()
redis = await aioredis.create_redis((HOST, 6379), loop=loop)
val = None
for i in range(i):
val = await redis.keys('*')
print(time.time() - start)
redis.close()
await redis.wait_closed()
return val
def pull_redis_queue(self, host="localhost", port=6379, **kwargs): #????redis????
if not redis_enable:
raise RedisImportException
redis = redis.Redis(host = host, port = port, **kwargs)
for key in redis.keys():
if name[:11] == "redis_queue":
self.queue_dict[key] = RedisQ(key, **kwargs)
if self.queue_name_counter.has_key(queue_type):
self.queue_name_counter["redis_queue"] += 1
else:
self.queue_name_counter["redis_queue"] = 0
def remove(self, queue_object=None, name = None): #????
if (not queue_object) and (not name): #????????
for queue in self.queue_dict.values():
queue = None
self.queue_dict = dict()
elif queue_object in self.queue_dict.values(): #????????
del self.queue_dict[queue_object.name]
elif name in self.queue_dict.keys(): #????????
del self.queue_dict[name]
else:
raise Exception("queue error")