Python多处理和共享计数器

发布于 2021-01-29 14:55:43

我在多处理模块上遇到了麻烦。我正在使用具有其map方法的工作人员池从大量文件中加载数据,对于每个文件,我都使用自定义函数来分析数据。每次处理文件时,我都希望更新一个计数器,以便可以跟踪还有多少文件需要处理。这是示例代码:

def analyze_data( args ):
    # do something 
    counter += 1
    print counter


if __name__ == '__main__':

    list_of_files = os.listdir(some_directory)

    global counter
    counter = 0

    p = Pool()
    p.map(analyze_data, list_of_files)

我找不到解决方案。

关注者
0
被浏览
131
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    问题在于该counter变量未在您的进程之间共享:每个单独的进程都在创建它自己的本地实例并对其进行递增。

    有关可用于在进程之间共享状态的某些技术,请参阅文档的本部分。在您的情况下,您可能希望Value在工作人员之间共享一个实例

    这是示例的工作版本(带有一些虚拟输入数据)。请注意,它使用的是全局值,实际上我会尽量避免使用这些值:

    from multiprocessing import Pool, Value
    from time import sleep
    
    counter = None
    
    def init(args):
        ''' store the counter for later use '''
        global counter
        counter = args
    
    def analyze_data(args):
        ''' increment the global counter, do something with the input '''
        global counter
        # += operation is not atomic, so we need to get a lock:
        with counter.get_lock():
            counter.value += 1
        print counter.value
        return args * 10
    
    if __name__ == '__main__':
        #inputs = os.listdir(some_directory)
    
        #
        # initialize a cross-process counter and the input lists
        #
        counter = Value('i', 0)
        inputs = [1, 2, 3, 4]
    
        #
        # create the pool of workers, ensuring each one receives the counter 
        # as it starts. 
        #
        p = Pool(initializer = init, initargs = (counter, ))
        i = p.map_async(analyze_data, inputs, chunksize = 1)
        i.wait()
        print i.get()
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看