def run(self, funcs):
"""Run a set of functions in parallel, returning their results.
Make sure any function you pass exits with a reasonable timeout. If it
doesn't return within the timeout or the result is ignored due an exception
in a separate thread it will continue to stick around until it finishes,
including blocking process exit.
Args:
funcs: An iterable of functions or iterable of args to functools.partial.
Returns:
A list of return values with the values matching the order in funcs.
Raises:
Propagates the first exception encountered in one of the functions.
"""
funcs = [f if callable(f) else functools.partial(*f) for f in funcs]
if len(funcs) == 1: # Ignore threads if it's not needed.
return [funcs[0]()]
if len(funcs) > self._workers: # Lazy init and grow as needed.
self.shutdown()
self._workers = len(funcs)
self._executor = futures.ThreadPoolExecutor(self._workers)
futs = [self._executor.submit(f) for f in funcs]
done, not_done = futures.wait(futs, self._timeout, futures.FIRST_EXCEPTION)
# Make sure to propagate any exceptions.
for f in done:
if not f.cancelled() and f.exception() is not None:
if not_done:
# If there are some calls that haven't finished, cancel and recreate
# the thread pool. Otherwise we may have a thread running forever
# blocking parallel calls.
for nd in not_done:
nd.cancel()
self.shutdown(False) # Don't wait, they may be deadlocked.
raise f.exception()
# Either done or timed out, so don't wait again.
return [f.result(timeout=0) for f in futs]
评论列表
文章目录