parallel.py 文件源码

python
阅读 35 收藏 0 点赞 0 评论 0

项目:baiji 作者: bodylabs 项目源码 文件源码
def parallel_for(a, cls, args=[], kwargs={}, num_processes=None):
    from multiprocessing import Process, JoinableQueue, cpu_count, Pipe
    if num_processes is None:
        num_processes = cpu_count()
    # Note that JoinableQueue uses an integer for tracking locations in the queue.
    # Because it's using shared memory it's not terribly flexible and gives annoyingly
    # unclear errors if you go over the limit. We'd like the queue to be as large as
    # possible so that we can avoid contention, but without allocating a max possible
    # size queue unless we need it, thus the calculation below. 32767 is a hard limit.
    q = JoinableQueue(maxsize=min(len(a)+num_processes, 2**15 - 1))

    output_pipes = [Pipe(duplex=False) for _ in range(num_processes)]
    send_pipes = [p for _, p in output_pipes]
    recv_pipes = [p for p, _ in output_pipes]
    pool = [Process(target=_parallel_for, args=(q, cls, pipe) + tuple(args), kwargs=kwargs)
            for pipe in send_pipes]
    output_watcher = MultiPipeWatcher(recv_pipes)
    try:
        for p in pool:
            p.start()
        output_watcher.start()
        for x in a:
            q.put(x)
        for _ in range(num_processes):
            q.put(None) # End markers
        q.close()
        q.join_thread()
        q.join()
        for p in pool:
            p.join()
        output_watcher.flush()
        output_watcher.join()
        combined_output = output_watcher.merged
        return combined_output
    except KeyboardInterrupt:
        print "Interrupted -- terminating worker processes"
        for p in pool:
            p.terminate()
        for p in pool:
            p.join()
        raise
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号