是否可以在Pool.imap调用的函数中使用多处理队列?

发布于 2021-01-29 19:32:39

我正在使用python
2.7,并尝试在自己的进程中运行一些CPU繁重的任务。我希望能够将消息发送回父流程,以使其随时了解流程的当前状态。为此,多处理队列似乎很完美,但我不知道如何使它工作。

因此,这是我的基本工作示例,减去了Queue的使用。

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我尝试以几种方式传递队列,它们收到错误消息“
RuntimeError:队列对象仅应通过继承在进程之间共享”。这是我根据之前发现的答案尝试的一种方法。(尝试使用Pool.map_async和Pool.imap时遇到相同的问题)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

最后,“ 0适应度”方法(使其成为全局变量)不会生成任何消息,而只是将其锁定。

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我知道它可能会直接与multiprocessing.Process一起使用,并且还有其他库可以完成此操作,但是我讨厌放弃非常适合的标准库函数,直到我确定不仅仅是我所缺少的知识使我无法利用它们。

谢谢。

关注者
0
被浏览
51
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    诀窍是将Queue作为参数传递给初始化程序。似乎可以与所有Pool调度方法一起使用。

    import multiprocessing as mp
    
    def f(x):
        f.q.put('Doing: ' + str(x))
        return x*x
    
    def f_init(q):
        f.q = q
    
    def main():
        jobs = range(1,6)
    
        q = mp.Queue()
        p = mp.Pool(None, f_init, [q])
        results = p.imap(f, jobs)
        p.close()
    
        for i in range(len(jobs)):
            print q.get()
            print results.next()
    
    if __name__ == '__main__':
        main()
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看