spider_gfactory.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:cms-htcondor-es 作者: bbockelm 项目源码 文件源码
def main():

    try:
        checkpoint = json.load(open("checkpoint.factory.json"))
    except:
        checkpoint = {}

    starttime = time.time()

    pool = multiprocessing.Pool(processes=10)
    future = pool.apply_async(get_schedds)
    schedd_ads = future.get(TIMEOUT_MINS*60)
    print "There are %d schedds to query." % len(schedd_ads)

    futures = []

    for schedd_ad in schedd_ads:
        name = schedd_ad["Name"]
        last_completion = checkpoint.get(name, 0)
        future = pool.apply_async(process_schedd, (starttime, last_completion, schedd_ad))
        futures.append((name, future))

    pool.close()

    timed_out = False
    for name, future in futures:
        time_remaining = TIMEOUT_MINS*60+10 - (time.time() - starttime)
        if time_remaining > 0:
            try:
                last_completion = future.get(time_remaining)
                if name:
                    checkpoint[schedd_ad["name"]] = last_completion
            except multiprocessing.TimeoutError:
                print "Schedd %s timed out; ignoring progress." % name
        else:
            timed_out = True
            break
    if timed_out:
        pool.terminate()
    pool.join()


    try:
        checkpoint_new = json.load(open("checkpoint.factory.json"))
    except:
        checkpoint_new = {}

    for key, val in checkpoint.items():
        if (key not in checkpoint_new) or (val > checkpoint_new[key]):
            checkpoint_new[key] = val

    fd, tmpname = tempfile.mkstemp(dir=".", prefix="checkpoint.factory.json.new")
    fd = os.fdopen(fd, "w")
    json.dump(checkpoint_new, fd)
    fd.close()
    os.rename(tmpname, "checkpoint.factory.json")

    print "Total processing time: %.2f mins" % ((time.time()-starttime)/60.)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号