def clopure_iter_mp_split(self, fn, local_vars):
def iter_split_generator(*g):
q_in = Queue()
q_out = Queue()
exit_input_thread = False
semaphore = Semaphore(self.queue_size)
ps = [Process(target=self.iter_split_evaluate_wrapper, args=(fn, local_vars, len(g), q_in, q_out)) for i in range(self.procs)]
for p in ps:
p.start()
def input_thread():
try:
for i, item in enumerate(zip(*g)):
semaphore.acquire()
if exit_input_thread:
return
q_in.put((i, item))
except BaseException:
traceback.print_exc(file=sys.stdout)
for i in range(self.procs):
q_in.put((0, EOFMessage))
t = Thread(target=input_thread)
t.start()
cur = 0
n_working_procs = self.procs
l = [None] * self.queue_size
while True:
k, data = q_out.get()
if data is EOFMessage:
n_working_procs -= 1
if n_working_procs == 0:
break
continue
l[k - cur] = (k, data)
while l[0]:
yield l.pop(0)[1]
l.append(None)
cur += 1
semaphore.release()
exit_input_thread = True
semaphore.release()
return iter_split_generator
评论列表
文章目录