Python多重处理-跟踪pool.map操作的过程
我有一个执行一些模拟并以字符串格式返回数组的函数。
我想运行模拟(函数)以更改输入参数值(超过10000个可能的输入值),并将结果写入单个文件。
我正在使用多重处理,特别是pool.map函数来并行运行模拟。
由于运行仿真功能超过10000次的整个过程需要很长时间,因此我真的很想跟踪整个操作的过程。
我认为下面我当前代码中的问题是,pool.map运行该函数10000次,在这些操作过程中没有任何进程跟踪。一旦并行处理完成了10000个模拟的运行(可能是数小时到数天),那么我将继续跟踪10000个模拟结果何时保存到文件中。因此,这实际上并不是在跟踪pool.map操作的处理。
我的代码是否有简单的修复程序,可以进行流程跟踪?
def simFunction(input):
# Does some simulation and outputs simResult
return str(simResult)
# Parallel processing
inputs = np.arange(0,10000,1)
if __name__ == "__main__":
numCores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes = numCores)
t = pool.map(simFunction, inputs)
with open('results.txt','w') as out:
print("Starting to simulate " + str(len(inputs)) + " input values...")
counter = 0
for i in t:
out.write(i + '\n')
counter = counter + 1
if counter%100==0:
print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
print('Finished!!!!')
-
如果使用迭代
map
函数,则跟踪进度非常容易。>>> from pathos.multiprocessing import ProcessingPool as Pool >>> def simFunction(x,y): ... import time ... time.sleep(2) ... return x**2 + y ... >>> x,y = range(100),range(-100,100,2) >>> res = Pool().imap(simFunction, x,y) >>> with open('results.txt', 'w') as out: ... for i in x: ... out.write("%s\n" % res.next()) ... if i%10 is 0: ... print "%s of %s simulated" % (i, len(x)) ... 0 of 100 simulated 10 of 100 simulated 20 of 100 simulated 30 of 100 simulated 40 of 100 simulated 50 of 100 simulated 60 of 100 simulated 70 of 100 simulated 80 of 100 simulated 90 of 100 simulated
或者,您可以使用异步
map
。在这里,我将做些不同的事情,只是将其混合在一起。>>> import time >>> res = Pool().amap(simFunction, x,y) >>> while not res.ready(): ... print "waiting..." ... time.sleep(5) ... waiting... waiting... waiting... waiting... >>> res.get() [-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]
请注意,我使用
pathos.multiprocessing
而不是multiprocessing
。这仅仅是一个分支multiprocessing
,使您能够map
使用多个输入来执行函数,具有更好的序列化,并允许您在map
任何地方(不仅是in
__main__
)执行调用。您也可以使用multiprocessing
上面的方法,但是代码会稍有不同。迭代的或异步的
map
都可以使您编写任何代码,以进行更好的过程跟踪。例如,将唯一的“
id”传递给每个作业,并观察返回的作业,或者让每个作业返回其进程ID。有很多跟踪进度和过程的方法……但是以上内容应该为您提供一个开始。你可以在
pathos
这里找到:https :
//github.com/uqfoundation