def main():
try:
checkpoint = json.load(open("checkpoint.factory.json"))
except:
checkpoint = {}
starttime = time.time()
pool = multiprocessing.Pool(processes=10)
future = pool.apply_async(get_schedds)
schedd_ads = future.get(TIMEOUT_MINS*60)
print "There are %d schedds to query." % len(schedd_ads)
futures = []
for schedd_ad in schedd_ads:
name = schedd_ad["Name"]
last_completion = checkpoint.get(name, 0)
future = pool.apply_async(process_schedd, (starttime, last_completion, schedd_ad))
futures.append((name, future))
pool.close()
timed_out = False
for name, future in futures:
time_remaining = TIMEOUT_MINS*60+10 - (time.time() - starttime)
if time_remaining > 0:
try:
last_completion = future.get(time_remaining)
if name:
checkpoint[schedd_ad["name"]] = last_completion
except multiprocessing.TimeoutError:
print "Schedd %s timed out; ignoring progress." % name
else:
timed_out = True
break
if timed_out:
pool.terminate()
pool.join()
try:
checkpoint_new = json.load(open("checkpoint.factory.json"))
except:
checkpoint_new = {}
for key, val in checkpoint.items():
if (key not in checkpoint_new) or (val > checkpoint_new[key]):
checkpoint_new[key] = val
fd, tmpname = tempfile.mkstemp(dir=".", prefix="checkpoint.factory.json.new")
fd = os.fdopen(fd, "w")
json.dump(checkpoint_new, fd)
fd.close()
os.rename(tmpname, "checkpoint.factory.json")
print "Total processing time: %.2f mins" % ((time.time()-starttime)/60.)
评论列表
文章目录