def iter_split_evaluate_wrapper(self, fn, local_vars, in_size, q_in, q_out):
l = Lock()
idx_q = Queue()
def split_iter():
try:
while True:
l.acquire()
i, data_in = q_in.get()
idx_q.put(i)
if data_in is EOFMessage:
return
yield data_in
except BaseException:
traceback.print_exc(file=sys.stdout)
gs = itertools.tee(split_iter(), in_size)
for data_out in self.evaluate((fn,) + tuple((lambda i: (x[i] for x in gs[i]))(i) for i in range(in_size)), local_vars=local_vars):
q_out.put((idx_q.get(), data_out))
l.release()
q_out.put((0, EOFMessage))
评论列表
文章目录