def resubmit_jobs():
'''
Examines the fetch and gather queues for items that are suspiciously old.
These are removed from the queues and placed back on them afresh, to ensure
the fetch & gather consumers are triggered to process it.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
return
redis = get_connection()
# fetch queue
harvest_object_pending = redis.keys(get_fetch_routing_key() + ':*')
for key in harvest_object_pending:
date_of_key = datetime.datetime.strptime(redis.get(key),
"%Y-%m-%d %H:%M:%S.%f")
# 3 minutes for fetch and import max
if (datetime.datetime.now() - date_of_key).seconds > 180:
redis.rpush(get_fetch_routing_key(),
json.dumps({'harvest_object_id': key.split(':')[-1]})
)
redis.delete(key)
# gather queue
harvest_jobs_pending = redis.keys(get_gather_routing_key() + ':*')
for key in harvest_jobs_pending:
date_of_key = datetime.datetime.strptime(redis.get(key),
"%Y-%m-%d %H:%M:%S.%f")
# 3 hours for a gather
if (datetime.datetime.now() - date_of_key).seconds > 7200:
redis.rpush(get_gather_routing_key(),
json.dumps({'harvest_job_id': key.split(':')[-1]})
)
redis.delete(key)
评论列表
文章目录