def main():
try:
checkpoint = json.load(open("checkpoint2.json"))
except:
checkpoint = {}
starttime = time.time()
pool = multiprocessing.Pool(processes=10)
future = pool.apply_async(get_schedds)
schedd_ads = future.get(TIMEOUT_MINS*60)
print "There are %d schedds to query." % len(schedd_ads)
futures = []
for schedd_ad in schedd_ads:
name = schedd_ad["Name"]
last_completion = checkpoint.get(name, 0)
future = pool.apply_async(process_schedd, (starttime, last_completion, schedd_ad))
futures.append((name, future))
timed_out = False
for name, future in futures:
time_remaining = TIMEOUT_MINS*60+10 - (time.time() - starttime)
if time_remaining > 0:
try:
last_completion = future.get(time_remaining)
checkpoint["name"] = last_completion
except multiprocessing.TimeoutError:
print "Schedd %s timed out; ignoring progress." % name
else:
timed_out = True
break
if timed_out:
pool.terminate()
else:
pool.close()
pool.join()
fd = open("checkpoint2.json.new", "w")
json.dump(checkpoint, fd)
fd.close()
os.rename("checkpoint2.json.new", "checkpoint2.json")
print "Total processing time: %.2f mins" % ((time.time()-starttime)/60.)
评论列表
文章目录