def iter_parallel_report(func, # type: Callable[..., Any]
args_lists, # type: Sequence[CallArgs]
ccmode=CC_PROCESSES):
# type: (...) -> Iterator[Union[ExeResult, ExcInfo]]
if ccmode == CC_OFF or len(args_lists) <= 1 or not multiprocessing:
for args, kwargs in args_lists:
yield func(*args, **kwargs)
return
processes = min(len(args_lists), multiprocessing.cpu_count())
if ccmode == CC_THREADS:
pool = multiprocessing.pool.ThreadPool(processes=processes)
else:
pool = multiprocessing.Pool(processes=processes, initializer=per_process_init)
try:
async_results = [pool.apply_async(func, args=args, kwds=kwargs)
for args, kwargs in args_lists]
pool.close()
while async_results:
try:
asyncres = async_results.pop(0)
yield asyncres.get()
except (KeyboardInterrupt, GeneratorExit):
raise
except Exception as e:
t, v, tb = sys.exc_info()
try:
# Report the textual traceback of the subprocess rather
# than this local exception which was triggered
# by the other side.
tb = e.traceback # type: ignore
except AttributeError:
pass
yield ExcInfo((t, v, tb))
except GeneratorExit:
pool.terminate()
except KeyboardInterrupt:
pool.terminate()
raise
finally:
pool.join()
评论列表
文章目录