def clopure_iter_mp_split_unord(self, fn, local_vars):
def iter_split_generator(*g):
q_in = Queue()
q_out = Queue()
exit_input_thread = False
semaphore = Semaphore(self.queue_size)
ps = [Process(target=self.iter_split_evaluate_wrapper, args=(fn, local_vars, len(g), q_in, q_out)) for i in range(self.procs)]
for p in ps:
p.start()
def input_thread():
try:
for i, item in enumerate(zip(*g)):
semaphore.acquire()
if exit_input_thread:
return
q_in.put((i, item))
except BaseException:
traceback.print_exc(file=sys.stdout)
for i in range(self.procs):
q_in.put((0, EOFMessage))
t = Thread(target=input_thread)
t.start()
n_working_procs = self.procs
while True:
k, data = q_out.get()
if data is EOFMessage:
n_working_procs -= 1
if n_working_procs == 0:
break
continue
yield data
semaphore.release()
for p in ps:
p.join()
exit_input_thread = True
semaphore.release()
return iter_split_generator
评论列表
文章目录