Python多重处理-跟踪pool.map操作的过程

发布于 2021-01-29 18:38:44

我有一个执行一些模拟并以字符串格式返回数组的函数。

我想运行模拟(函数)以更改输入参数值(超过10000个可能的输入值),并将结果写入单个文件。

我正在使用多重处理,特别是pool.map函数来并行运行模拟。

由于运行仿真功能超过10000次的整个过程需要很长时间,因此我真的很想跟踪整个操作的过程。

我认为下面我当前代码中的问题是,pool.map运行该函数10000次,在这些操作过程中没有任何进程跟踪。一旦并行处理完成了10000个模拟的运行(可能是数小时到数天),那么我将继续跟踪10000个模拟结果何时保存到文件中。因此,这实际上并不是在跟踪pool.map操作的处理。

我的代码是否有简单的修复程序,可以进行流程跟踪?

def simFunction(input):
    # Does some simulation and outputs simResult
    return str(simResult)

# Parallel processing

inputs = np.arange(0,10000,1)

if __name__ == "__main__":
    numCores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes = numCores)
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out:
        print("Starting to simulate " + str(len(inputs)) + " input values...")
        counter = 0
        for i in t:
            out.write(i + '\n')
            counter = counter + 1
            if counter%100==0:
                print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
    print('Finished!!!!')
关注者
0
被浏览
180
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    如果使用迭代map函数,则跟踪进度非常容易。

    >>> from pathos.multiprocessing import ProcessingPool as Pool
    >>> def simFunction(x,y):
    ...   import time
    ...   time.sleep(2)
    ...   return x**2 + y
    ... 
    >>> x,y = range(100),range(-100,100,2)
    >>> res = Pool().imap(simFunction, x,y)
    >>> with open('results.txt', 'w') as out:
    ...   for i in x:
    ...     out.write("%s\n" % res.next())
    ...     if i%10 is 0:
    ...       print "%s of %s simulated" % (i, len(x))
    ... 
    0 of 100 simulated
    10 of 100 simulated
    20 of 100 simulated
    30 of 100 simulated
    40 of 100 simulated
    50 of 100 simulated
    60 of 100 simulated
    70 of 100 simulated
    80 of 100 simulated
    90 of 100 simulated
    

    或者,您可以使用异步map。在这里,我将做些不同的事情,只是将其混合在一起。

    >>> import time
    >>> res = Pool().amap(simFunction, x,y)
    >>> while not res.ready():
    ...   print "waiting..."
    ...   time.sleep(5)
    ... 
    waiting...
    waiting...
    waiting...
    waiting...
    >>> res.get()
    [-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]
    

    请注意,我使用pathos.multiprocessing而不是multiprocessing。这仅仅是一个分支multiprocessing,使您能够map使用多个输入来执行函数,具有更好的序列化,并允许您在map任何地方(不仅是in
    __main__)执行调用。您也可以使用multiprocessing上面的方法,但是代码会稍有不同。

    迭代的或异步的map都可以使您编写任何代码,以进行更好的过程跟踪。例如,将唯一的“
    id”传递给每个作业,并观察返回的作业,或者让每个作业返回其进程ID。有很多跟踪进度和过程的方法……但是以上内容应该为您提供一个开始。

    你可以在pathos这里找到:https :
    //github.com/uqfoundation



知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看