Process.join()和队列不适用于大量

发布于 2021-01-29 18:28:07

我试图分裂循环即

N = 1000000
for i in xrange(N):
    #do something

使用multiprocessing.Process,它对于较小的N值效果很好。当我使用较大的N值时,就会出现问题。在p.join()之前或期间发生了一些奇怪的事情,并且程序没有响应。如果我将print
i放在函数f的定义中,而不是q.put(i),则一切正常。

我将不胜感激任何帮助。这是代码。

from multiprocessing import Process, Queue

def f(q,nMin, nMax): # function for multiprocessing
    for i in xrange(nMin,nMax):
        q.put(i)

if __name__ == '__main__':

    nEntries = 1000000

    nCpu = 10
    nEventsPerCpu = nEntries/nCpu
    processes = []

    q = Queue()

    for i in xrange(nCpu):
        processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    print q.qsize()
关注者
0
被浏览
47
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    您正在尝试无限制地增加队列,并且您正在加入一个等待队列中有空间的子进程,因此您的主进程被拖延以等待该进程完成,而永远不会。

    如果在连接之前将数据从队列中拉出,它将可以正常工作。

    您可以使用的一种技术是这样的:

    while 1:
        running = any(p.is_alive() for p in processes)
        while not queue.empty():
           process_queue_data()
        if not running:
            break
    

    根据文档,p.is_alive()应该执行隐式连接,但它似乎还暗示最佳实践可能是在此之后在所有线程上显式执行连接。

    编辑:尽管这很清楚,但可能不是全部。如何使其性能更好取决于任务和计算机的具体情况(通常,一次不应该一次创建那么多进程,除非某些进程将在I / O上被阻塞)。

    除了将进程数量减少到CPU数量之外,一些简单的修复方法也可以使其更快一些(再次取决于具体情况),如下所示:

    liveprocs = list(processes)
    while liveprocs:
        try:
            while 1:
                process_queue_data(q.get(False))
        except Queue.Empty:
            pass
    
        time.sleep(0.5)    # Give tasks a chance to put more data in
        if not q.empty():
            continue
        liveprocs = [p for p in liveprocs if p.is_alive()]
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看