def parallel_for(a, cls, args=[], kwargs={}, num_processes=None):
from multiprocessing import Process, JoinableQueue, cpu_count, Pipe
if num_processes is None:
num_processes = cpu_count()
# Note that JoinableQueue uses an integer for tracking locations in the queue.
# Because it's using shared memory it's not terribly flexible and gives annoyingly
# unclear errors if you go over the limit. We'd like the queue to be as large as
# possible so that we can avoid contention, but without allocating a max possible
# size queue unless we need it, thus the calculation below. 32767 is a hard limit.
q = JoinableQueue(maxsize=min(len(a)+num_processes, 2**15 - 1))
output_pipes = [Pipe(duplex=False) for _ in range(num_processes)]
send_pipes = [p for _, p in output_pipes]
recv_pipes = [p for p, _ in output_pipes]
pool = [Process(target=_parallel_for, args=(q, cls, pipe) + tuple(args), kwargs=kwargs)
for pipe in send_pipes]
output_watcher = MultiPipeWatcher(recv_pipes)
try:
for p in pool:
p.start()
output_watcher.start()
for x in a:
q.put(x)
for _ in range(num_processes):
q.put(None) # End markers
q.close()
q.join_thread()
q.join()
for p in pool:
p.join()
output_watcher.flush()
output_watcher.join()
combined_output = output_watcher.merged
return combined_output
except KeyboardInterrupt:
print "Interrupted -- terminating worker processes"
for p in pool:
p.terminate()
for p in pool:
p.join()
raise
评论列表
文章目录