如何使用multiprocessing.Queue.get方法?
下面的代码将三个数字放在一个队列中。然后,它尝试从队列中取回数字。但是它从来没有。如何从队列中获取数据?
import multiprocessing
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while not queue.empty():
print queue.get()
-
在阅读@Martijn
Pieters的文章后,我最初删除了此答案,因为他更详细地描述了“为什么这不起作用”以及更早的版本。然后我意识到,OP的示例中的用例并不完全符合“如何使用multiprocessing.Queue.get方法”。
这不是因为没有用于演示的子进程,而是因为在实际的应用程序中,几乎没有预先填充队列,而是仅在队列之后才读出,而是在等待时间之间交错读写。Martijn演示的扩展演示代码在通常情况下不起作用,因为当入队不符合阅读要求时,while循环可能会中断得太早。因此,这里是重新加载的答案,它能够处理通常的交错提要和阅读方案:
不要依赖queue.empty检查同步。
将对象放在空队列上之后,在队列的empty()方法返回False且get_nowait()可以在不引发queue.Empty的情况下返回之前,可能会有无穷的延迟。…
空()
如果队列为空,则返回True,否则返回False。由于多线程/多处理语义,这是不可靠的。docs
从队列中使用
for msg in iter(queue.get, sentinel):
to.get()
,您都可以通过传递哨兵值来中断循环…
iter(callable,sentinel)?from multiprocessing import Queue SENTINEL = None if __name__ == '__main__': queue = Queue() for i in [*range(3), SENTINEL]: queue.put(i) for msg in iter(queue.get, SENTINEL): print(msg)
…或在需要非阻塞解决方案时使用
get_nowait()
并处理可能的queue.Empty
异常。from multiprocessing import Queue from queue import Empty import time SENTINEL = None if __name__ == '__main__': queue = Queue() for i in [*range(3), SENTINEL]: queue.put(i) while True: try: msg = queue.get_nowait() if msg == SENTINEL: break print(msg) except Empty: # do other stuff time.sleep(0.1)
如果只有一个进程并且该进程中只有一个线程正在读取队列,则还可以与以下交换最后一个代码片段:
while True: if not queue.empty(): # this is not an atomic operation ... msg = queue.get() # ... thread could be interrupted in between if msg == SENTINEL: break print(msg) else: # do other stuff time.sleep(0.1)
由于线程可能会在检查和之间删除GIL,因此这不适用于进程中的多线程队列读取。如果从队列中读取多个进程,则同样适用。
if not queue.empty()``queue.get()
对于单一生产者/单一消费者的方案,使用a
multiprocessing.Pipe
代替multiprocessing.Queue
将是足够的,并且性能更高。