def run(self, funcs):
"""Run a set of functions in parallel, returning their results.
Make sure any function you pass exits with a reasonable timeout. If it
doesn't return within the timeout or the result is ignored due an exception
in a separate thread it will continue to stick around until it finishes,
including blocking process exit.
Args:
funcs: An iterable of functions or iterable of args to functools.partial.
Returns:
A list of return values with the values matching the order in funcs.
Raises:
Propagates the first exception encountered in one of the functions.
"""
funcs = [f if callable(f) else functools.partial(*f) for f in funcs]
if len(funcs) == 1: # Ignore threads if it's not needed.
return [funcs[0]()]
if len(funcs) > self._workers: # Lazy init and grow as needed.
self.shutdown()
self._workers = len(funcs)
self._executor = futures.ThreadPoolExecutor(self._workers)
futs = [self._executor.submit(f) for f in funcs]
done, not_done = futures.wait(futs, self._timeout, futures.FIRST_EXCEPTION)
# Make sure to propagate any exceptions.
for f in done:
if not f.cancelled() and f.exception() is not None:
if not_done:
# If there are some calls that haven't finished, cancel and recreate
# the thread pool. Otherwise we may have a thread running forever
# blocking parallel calls.
for nd in not_done:
nd.cancel()
self.shutdown(False) # Don't wait, they may be deadlocked.
raise f.exception()
# Either done or timed out, so don't wait again.
return [f.result(timeout=0) for f in futs]
python类FIRST_EXCEPTION的实例源码
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self):
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self):
future1 = self.executor.submit(time.sleep, 2)
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self):
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self):
future1 = self.executor.submit(time.sleep, 2)
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self):
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self):
future1 = self.executor.submit(time.sleep, 2)
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self):
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self):
future1 = self.executor.submit(time.sleep, 2)
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)
01_cancelling_coroutines.py 文件源码
项目:asyncio-coroutine-patterns
作者: yeraydiazdiaz
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def get_comments_of_top_stories(loop, session, limit, iteration):
"""Retrieve top stories in HN.
"""
fetcher = URLFetcher() # create a new fetcher for this task
try:
response = await fetcher.fetch(session, TOP_STORIES_URL)
except BoomException as e:
log.error("Error retrieving top stories: {}".format(e))
# return instead of re-raising as it will go unnoticed
return
except Exception as e: # catch generic exceptions
log.error("Unexpected exception: {}".format(e))
return
tasks = [post_number_of_comments(
loop, session, fetcher, post_id) for post_id in response[:limit]]
# return on first exception to cancel any pending tasks
done, pending = await asyncio.wait(tasks, return_when=FIRST_EXCEPTION)
# cancel any pending tasks, the tuple could be empty so it's safe
for pending_task in pending:
pending_task.cancel()
# process the done tasks
for done_task in done:
# one of the Tasks could raise an exception
try:
print("Post ??? has {} comments ({})".format(
done_task.result(), iteration))
except BoomException as e:
print("Error retrieving comments for top stories: {}".format(e))
return fetcher.fetch_counter
03_cancelling_coroutines.py 文件源码
项目:asyncio-coroutine-patterns
作者: yeraydiazdiaz
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def get_comments_of_top_stories(loop, session, limit, iteration):
"""Retrieve top stories in HN.
"""
fetcher = URLFetcher() # create a new fetcher for this task
try:
response = await fetcher.fetch(session, TOP_STORIES_URL)
except BoomException as e:
log.error("Error retrieving top stories: {}".format(e))
# return instead of re-raising as it will go unnoticed
return
except Exception as e: # catch generic exceptions
log.error("Unexpected exception: {}".format(e))
return
tasks = {
asyncio.ensure_future(
post_number_of_comments(loop, session, fetcher, post_id)
): post_id for post_id in response[:limit]}
# return on first exception to cancel any pending tasks
done, pending = await asyncio.wait(
tasks.keys(), return_when=FIRST_EXCEPTION)
# if there are pending tasks is because there was an exception
# cancel any pending tasks
for pending_task in pending:
pending_task.cancel()
# process the done tasks
for done_task in done:
# if an exception is raised one of the Tasks will raise
try:
print("Post {} has {} comments ({})".format(
tasks[done_task], done_task.result(), iteration))
except BoomException as e:
print("Error retrieving comments for top stories: {}".format(e))
return fetcher.fetch_counter
04_cancelling_coroutines.py 文件源码
项目:asyncio-coroutine-patterns
作者: yeraydiazdiaz
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def get_comments_of_top_stories(loop, session, limit, iteration):
"""Retrieve top stories in HN.
"""
fetcher = URLFetcher() # create a new fetcher for this task
try:
response = await fetcher.fetch(session, TOP_STORIES_URL)
except BoomException as e:
log.error("Error retrieving top stories: {}".format(e))
# return instead of re-raising as it will go unnoticed
return
except Exception as e: # catch generic exceptions
log.error("Unexpected exception: {}".format(e))
return
tasks = {
asyncio.ensure_future(
post_number_of_comments(loop, session, fetcher, post_id)
): post_id for post_id in response[:limit]}
# return on first exception to cancel any pending tasks
done, pending = await asyncio.shield(asyncio.wait(
tasks.keys(), return_when=FIRST_EXCEPTION))
# if there are pending tasks is because there was an exception
# cancel any pending tasks
for pending_task in pending:
pending_task.cancel()
# process the done tasks
for done_task in done:
# if an exception is raised one of the Tasks will raise
try:
print("Post {} has {} comments ({})".format(
tasks[done_task], done_task.result(), iteration))
except BoomException as e:
print("Error retrieving comments for top stories: {}".format(e))
return fetcher.fetch_counter
02_cancelling_coroutines.py 文件源码
项目:asyncio-coroutine-patterns
作者: yeraydiazdiaz
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def get_comments_of_top_stories(loop, session, limit, iteration):
"""Retrieve top stories in HN.
"""
fetcher = URLFetcher() # create a new fetcher for this task
try:
response = await fetcher.fetch(session, TOP_STORIES_URL)
except BoomException as e:
log.error("Error retrieving top stories: {}".format(e))
# return instead of re-raising as it will go unnoticed
return
except Exception as e: # catch generic exceptions
log.error("Unexpected exception: {}".format(e))
return
tasks = {
asyncio.ensure_future(
post_number_of_comments(loop, session, fetcher, post_id)
): post_id for post_id in response[:limit]}
# return on first exception to cancel any pending tasks
done, pending = await asyncio.wait(
tasks.keys(), return_when=FIRST_EXCEPTION)
# if there are pending tasks is because there was an exception
# cancel any pending tasks
for pending_task in pending:
pending_task.cancel()
# process the done tasks
for done_task in done:
# if an exception is raised one of the Tasks will raise
try:
print("Post {} has {} comments ({})".format(
tasks[done_task], done_task.result(), iteration))
except BoomException as e:
print("Error retrieving comments for top stories: {}".format(e))
return fetcher.fetch_counter
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self):
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self):
future1 = self.executor.submit(time.sleep, 2)
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self):
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self):
future1 = self.executor.submit(time.sleep, 2)
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)