如何使用multiprocessing.Queue.get方法?

发布于 2021-01-29 18:06:25

下面的代码将三个数字放在一个队列中。然后,它尝试从队列中取回数字。但是它从来没有。如何从队列中获取数据?

import multiprocessing

queue = multiprocessing.Queue()

for i in range(3):
    queue.put(i)

while not queue.empty():
    print queue.get()
关注者
0
被浏览
53
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    在阅读@Martijn
    Pieters的文章后,我最初删除了此答案,因为他更详细地描述了“为什么这不起作用”以及更早的版本。然后我意识到,OP的示例中的用例并不完全符合

    “如何使用multiprocessing.Queue.get方法”。

    这不是因为没有用于演示的子进程,而是因为在实际的应用程序中,几乎没有预先填充队列,而是仅在队列之后才读出,而是在等待时间之间交错读写。Martijn演示的扩展演示代码在通常情况下不起作用,因为当入队不符合阅读要求时,while循环可能会中断得太早。因此,这里是重新加载的答案,它能够处理通常的交错提要和阅读方案:


    不要依赖queue.empty检查同步。

    将对象放在空队列上之后,在队列的empty()方法返回False且get_nowait()可以在不引发queue.Empty的情况下返回之前,可能会有无穷的延迟。…

    空()

    如果队列为空,则返回True,否则返回False。由于多线程/多处理语义,这是不可靠的。docs

    从队列中使用for msg in iter(queue.get, sentinel):to .get(),您都可以通过传递哨兵值来中断循环…
    iter(callable,sentinel)?

    from multiprocessing import Queue
    
    SENTINEL = None
    
    if __name__ == '__main__':
    
        queue = Queue()
    
        for i in [*range(3), SENTINEL]:
            queue.put(i)
    
        for msg in iter(queue.get, SENTINEL):
            print(msg)
    

    …或在需要非阻塞解决方案时使用get_nowait()并处理可能的queue.Empty异常。

    from multiprocessing import Queue
    from queue import Empty
    import time
    
    SENTINEL = None
    
    if __name__ == '__main__':
    
        queue = Queue()
    
        for i in [*range(3), SENTINEL]:
            queue.put(i)
    
        while True:
            try:
                msg = queue.get_nowait()
                if msg == SENTINEL:
                    break
                print(msg)
            except Empty:
                # do other stuff
                time.sleep(0.1)
    

    如果只有一个进程并且该进程中只有一个线程正在读取队列,则还可以与以下交换最后一个代码片段:

    while True:
        if not queue.empty():  # this is not an atomic operation ...
            msg = queue.get()  # ... thread could be interrupted in between
            if msg == SENTINEL:
                break
            print(msg)
        else:
            # do other stuff
            time.sleep(0.1)
    

    由于线程可能会在检查和之间删除GIL,因此这不适用于进程中的多线程队列读取。如果从队列中读取多个进程,则同样适用。if not queue.empty()``queue.get()

    对于单一生产者/单一消费者的方案,使用amultiprocessing.Pipe代替multiprocessing.Queue将是足够的,并且性能更高。



知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看