多处理-管道与队列

发布于 2021-01-29 19:23:11

Python的多处理程序包中的队列和管道之间的根本区别是什么?

在什么情况下应该选择一种?什么时候使用比较有利Pipe()?什么时候使用比较有利Queue()

关注者
0
被浏览
43
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。
    • APipe()只能有两个端点。

    • 一个Queue()可以有多个生产者和消费者。

    何时使用它们

    如果您需要两个以上的交流点,请使用Queue()

    如果您需要绝对的性能,那么aPipe()会更快,因为Queue()它建立在之上Pipe()

    绩效基准

    假设您要生成两个进程并在它们之间尽快发送消息。这些是使用Pipe()和进行类似测试之间的拖动竞赛的计时结果Queue()。这是在运行Ubuntu
    11.10和Python 2.7.2的ThinkpadT61上进行的。

    仅供参考,我把成绩JoinableQueue()作为奖金;JoinableQueue()queue.task_done()调用时说明任务(它甚至不知道特定任务,它只计算队列中未完成的任务),以便queue.join()知道工作已完成。

    此答案底部的每个代码…

    mpenning@mpenning-T61:~$ python multi_pipe.py 
    Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
    Sending 100000 numbers to Pipe() took 0.328398942947 seconds
    Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
    mpenning@mpenning-T61:~$ python multi_queue.py 
    Sending 10000 numbers to Queue() took 0.105256080627 seconds
    Sending 100000 numbers to Queue() took 0.980564117432 seconds
    Sending 1000000 numbers to Queue() took 10.1611330509 seconds
    mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
    Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
    Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
    Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
    mpenning@mpenning-T61:~$
    

    概括而言,Pipe()它的速度比速度快3倍Queue()JoinableQueue()除非您确实必须有好处,否则请不要考虑。

    奖励材料2

    除非您知道一些捷径,否则多处理会在信息流中引入微妙的变化,使调试变得困难。例如,在许多情况下,当您通过字典建立索引时,您的脚本可能工作正常,但是某些输入很少会失败。

    通常,当整个python进程崩溃时,我们会获得有关失败的线索;但是,如果多处理功能崩溃,则不会在控制台上打印未经请求的崩溃回溯。很难找到未知的多处理崩溃,而又不知道导致进程崩溃的线索。

    我发现跟踪多处理崩溃信息的最简单方法是将整个多处理功能包装在try/中except并使用traceback.print_exc()

    import traceback
    def run(self, args):
        try:
            # Insert stuff to be multiprocessed here
            return args[0]['that']
        except:
            print "FATAL: reader({0}) exited while multiprocessing".format(args) 
            traceback.print_exc()
    

    现在,当您发现崩溃时,您会看到类似以下内容的信息:

    FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
    Traceback (most recent call last):
      File "foo.py", line 19, in __init__
        self.run(args)
      File "foo.py", line 46, in run
        KeyError: 'that'
    

    源代码:


    """
    multi_pipe.py
    """
    from multiprocessing import Process, Pipe
    import time
    
    def reader_proc(pipe):
        ## Read from the pipe; this will be spawned as a separate Process
        p_output, p_input = pipe
        p_input.close()    # We are only reading
        while True:
            msg = p_output.recv()    # Read from the output pipe and do nothing
            if msg=='DONE':
                break
    
    def writer(count, p_input):
        for ii in xrange(0, count):
            p_input.send(ii)             # Write 'count' numbers into the input pipe
        p_input.send('DONE')
    
    if __name__=='__main__':
        for count in [10**4, 10**5, 10**6]:
            # Pipes are unidirectional with two endpoints:  p_input ------> p_output
            p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
            reader_p = Process(target=reader_proc, args=((p_output, p_input),))
            reader_p.daemon = True
            reader_p.start()     # Launch the reader process
    
            p_output.close()       # We no longer need this part of the Pipe()
            _start = time.time()
            writer(count, p_input) # Send a lot of stuff to reader_proc()
            p_input.close()
            reader_p.join()
            print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
                (time.time() - _start)))
    

    """
    multi_queue.py
    """
    
    from multiprocessing import Process, Queue
    import time
    import sys
    
    def reader_proc(queue):
        ## Read from the queue; this will be spawned as a separate Process
        while True:
            msg = queue.get()         # Read from the queue and do nothing
            if (msg == 'DONE'):
                break
    
    def writer(count, queue):
        ## Write to the queue
        for ii in range(0, count):
            queue.put(ii)             # Write 'count' numbers into the queue
        queue.put('DONE')
    
    if __name__=='__main__':
        pqueue = Queue() # writer() writes to pqueue from _this_ process
        for count in [10**4, 10**5, 10**6]:             
            ### reader_proc() reads from pqueue as a separate process
            reader_p = Process(target=reader_proc, args=((pqueue),))
            reader_p.daemon = True
            reader_p.start()        # Launch reader_proc() as a separate python process
    
            _start = time.time()
            writer(count, pqueue)    # Send a lot of stuff to reader()
            reader_p.join()         # Wait for the reader to finish
            print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
                (time.time() - _start)))
    

    """
    multi_joinablequeue.py
    """
    from multiprocessing import Process, JoinableQueue
    import time
    
    def reader_proc(queue):
        ## Read from the queue; this will be spawned as a separate Process
        while True:
            msg = queue.get()         # Read from the queue and do nothing
            queue.task_done()
    
    def writer(count, queue):
        for ii in xrange(0, count):
            queue.put(ii)             # Write 'count' numbers into the queue
    
    if __name__=='__main__':
        for count in [10**4, 10**5, 10**6]:
            jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
            # reader_proc() reads from jqueue as a different process...
            reader_p = Process(target=reader_proc, args=((jqueue),))
            reader_p.daemon = True
            reader_p.start()     # Launch the reader process
            _start = time.time()
            writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
            jqueue.join()         # Wait for the reader to finish
            print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, 
                (time.time() - _start)))
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看