def shutdown_listener(cls):
if cls._queue:
cls._queue.put(None)
try:
cls._queue.get(timeout=3)
except TimeoutError:
pass
python类TimeoutError()的实例源码
def run(self, result):
"""
Distribute test cases across workers.
Return an identifier of each test case with its result in order to use
imap_unordered to show results as soon as they're available.
To minimize pickling errors when getting results from workers:
- pass back numeric indexes in self.subsuites instead of tests
- make tracebacks picklable with tblib, if available
Even with tblib, errors may still occur for dynamically created
exception classes such Model.DoesNotExist which cannot be unpickled.
"""
if tblib is not None:
tblib.pickling_support.install()
counter = multiprocessing.Value(ctypes.c_int, 0)
pool = multiprocessing.Pool(
processes=self.processes,
initializer=self.init_worker.__func__,
initargs=[counter])
args = [
(index, subsuite, self.failfast)
for index, subsuite in enumerate(self.subsuites)
]
test_results = pool.imap_unordered(self.run_subsuite.__func__, args)
while True:
if result.shouldStop:
pool.terminate()
break
try:
subsuite_index, events = test_results.next(timeout=0.1)
except multiprocessing.TimeoutError:
continue
except StopIteration:
pool.close()
break
tests = list(self.subsuites[subsuite_index])
for event in events:
event_name = event[0]
handler = getattr(result, event_name, None)
if handler is None:
continue
test = tests[event[1]]
args = event[2:]
handler(test, *args)
pool.join()
return result
def shard_download(self, pointer, shard_index, bucket_id, file_id):
self.__logger.debug('Beginning download proccess...')
try:
self.__logger.debug('Starting download threads...')
self.__logger.debug('Downloading shard at index %s ...',
shard_index)
url = 'http://{address}:{port}/shards/{hash}?token={token}'.format(
address=pointer.get('farmer')['address'],
port=str(pointer.get('farmer')['port']),
hash=pointer['hash'],
token=pointer['token'])
self.__logger.debug(url)
tp = ThreadPool(processes=1)
async_result = tp.apply_async(
self.retrieve_shard_file,
(url, shard_index)) # tuple of args for foo
shard = async_result.get(self.client.timeout) # get the return value
# shard = self.retrieve_shard_file(url, shard_index)
self.__logger.debug('Shard downloaded')
self.__logger.debug('Shard at index %s downloaded successfully',
shard_index)
return shard
except IOError as e:
self.__logger.error('Perm error %s', e)
if str(e) == str(13):
self.__logger.error("""Error while saving or reading file or
temporary file.
Probably this is caused by insufficient permisions.
Please check if you have permissions to write or
read from selected directories.""")
except TimeoutError:
self.__logger.warning('Aborting shard %s download due to timeout' %
shard_index)
tp.terminate()
self.__logger.warning('Try with a new pointer')
new_pointer = self.client.file_pointers(
bucket_id=bucket_id,
file_id=file_id,
limit='1',
skip=str(shard_index),
exclude=str([pointer['farmer']['nodeID']]))
self.__logger.debug('Found new pointer')
return self.shard_download(new_pointer[0], shard_index,
bucket_id, file_id)
except Exception as e:
self.__logger.error(e)
self.__logger.error('Unhandled')
def parse_async(self, structure_files, timeout, model=None,
parser=RegularStructureParser):
"""
Call C{self.parse_structure} for a list of structure files
simultaneously. The actual degree of parallelism will depend on the
number of workers specified while constructing the parser object.
@note: Don't be tempted to pass a large list of structures to this
method. Every time a C{TimeoutError} is encountered, the
corresponding worker process in the pool will hang until the
process terminates on its own. During that time, this worker is
unusable. If a sufficiently high number of timeouts occur, the
whole pool of workers will be unsable. At the end of the method
however a pool cleanup is performed and any unusable workers
are 'reactivated'. However, that only happens at B{the end} of
C{parse_async}.
@param structure_files: a list of structure files
@type structure_files: tuple of str
@param timeout: raise multiprocessing.TimeoutError if C{timeout} seconds
elapse before the parser completes its job
@type timeout: int
@param parser: any implementing L{AbstractStructureParser} class
@type parser: type
@return: a list of L{AsyncParseResult} objects
@rtype: list
"""
pool = self._pool
workers = []
results = []
for file in list(structure_files):
result = pool.apply_async(_parse_async, [parser, file, model])
workers.append(result)
hanging = False
for w in workers:
result = AsyncParseResult(None, None)
try:
result.result = w.get(timeout=timeout)
except KeyboardInterrupt as ki:
pool.terminate()
raise ki
except Exception as ex:
result.exception = ex
if isinstance(ex, multiprocessing.TimeoutError):
hanging = True
results.append(result)
if hanging:
self._recycle()
return results
def modify_without_breaking(bytez, actions=[], seed=None):
for action in actions:
_action = ACTION_TABLE[action]
# we run manipulation in a child process to shelter
# our malware model from rare parsing errors in LIEF that
# may segfault or timeout
def helper(_action,shared_list):
# TODO: LIEF is chatty. redirect stdout and stderr to /dev/null
# for this process, change segfault of the child process
# to a RuntimeEror
def sig_handler(signum, frame):
raise RuntimeError
signal.signal(signal.SIGSEGV, sig_handler)
bytez = array.array('B', shared_list[:]).tobytes()
# TODO: LIEF is chatty. redirect output to /dev/null
if type(_action) is str:
_action = MalwareManipulator(bytez).__getattribute__(_action)
else:
_action = functools.partial( _action, bytez )
# redirect standard out only in this queue
try:
shared_list[:] = _action(seed)
except (RuntimeError,UnicodeDecodeError,TypeError,lief.not_found) as e:
# some exceptions that have yet to be handled by public release of LIEF
print("==== exception in child process ===")
print(e)
# shared_bytez remains unchanged
# communicate with the subprocess through a shared list
# can't use multiprocessing.Array since the subprocess may need to
# change the size
manager = multiprocessing.Manager()
shared_list = manager.list()
shared_list[:] = bytez # copy bytez to shared array
# define process
p = multiprocessing.Process( target=helper, args=(_action,shared_list) )
p.start() # start the process
try:
p.join(5) # allow this to take up to 5 seconds...
except multiprocessing.TimeoutError: # ..then become petulant
print('==== timeouterror ')
p.terminate()
bytez = array.array('B', shared_list[:]).tobytes() # copy result from child process
import hashlib
m = hashlib.sha256()
m.update( bytez )
print("new hash: {}".format(m.hexdigest()))
return bytez
def check_requirements(self):
'''
If PyQt4/PyQt5 is already imported, importing PyQt5/PyQt4 will fail
so we need to test in a subprocess (as for Gtk3).
'''
try:
p = multiprocessing.Pool()
except:
# Can't do multiprocessing, fall back to normal approach ( this will fail if importing both PyQt4 and PyQt5 )
try:
# Try in-process
msg = self.callback(self)
except RuntimeError:
raise CheckFailed("Could not import: are PyQt4 & PyQt5 both installed?")
except:
# Raise any other exceptions
raise
else:
# Multiprocessing OK
try:
res = p.map_async(self.callback, [self])
msg = res.get(timeout=10)[0]
except multiprocessing.TimeoutError:
p.terminate()
# No result returned. Probaly hanging, terminate the process.
raise CheckFailed("Check timed out")
except:
# Some other error.
p.close()
raise
else:
# Clean exit
p.close()
finally:
# Tidy up multiprocessing
p.join()
return msg
def run(self, result):
"""
Distribute test cases across workers.
Return an identifier of each test case with its result in order to use
imap_unordered to show results as soon as they're available.
To minimize pickling errors when getting results from workers:
- pass back numeric indexes in self.subsuites instead of tests
- make tracebacks picklable with tblib, if available
Even with tblib, errors may still occur for dynamically created
exception classes such Model.DoesNotExist which cannot be unpickled.
"""
counter = multiprocessing.Value(ctypes.c_int, 0)
pool = multiprocessing.Pool(
processes=self.processes,
initializer=self.init_worker.__func__,
initargs=[counter])
args = [
(self.runner_class, index, subsuite, self.failfast)
for index, subsuite in enumerate(self.subsuites)
]
test_results = pool.imap_unordered(self.run_subsuite.__func__, args)
while True:
if result.shouldStop:
pool.terminate()
break
try:
subsuite_index, events = test_results.next(timeout=0.1)
except multiprocessing.TimeoutError:
continue
except StopIteration:
pool.close()
break
tests = list(self.subsuites[subsuite_index])
for event in events:
event_name = event[0]
handler = getattr(result, event_name, None)
if handler is None:
continue
test = tests[event[1]]
args = event[2:]
handler(test, *args)
pool.join()
return result
def _consume_record(self, record):
"""De-serialize the message and execute the incoming job.
:param record: Record fetched from the Kafka topic.
:type record: kafka.consumer.fetcher.ConsumerRecord
"""
rec = rec_repr(record)
self._logger.info('Processing {} ...'.format(rec))
# noinspection PyBroadException
try:
job = dill.loads(record.value)
except Exception:
self._logger.warning('{} unloadable. Skipping ...'.format(rec))
else:
# Simple check for job validity
if not (isinstance(job, Job)
and isinstance(job.args, collections.Iterable)
and isinstance(job.kwargs, collections.Mapping)
and callable(job.func)):
self._logger.warning('{} malformed. Skipping ...'.format(rec))
return
func, args, kwargs = job.func, job.args, job.kwargs
self._logger.info('Running Job {}: {} ...'.format(
job.id, func_repr(func, args, kwargs)
))
try:
timeout = self._timeout or job.timeout
if timeout is None:
res = func(*args, **kwargs)
else:
run = self._pool.apply_async(func, args, kwargs)
res = run.get(timeout)
except mp.TimeoutError:
self._logger.error('Job {} timed out after {} seconds.'
.format(job.id, job.timeout))
self._exec_callback('timeout', job, None, None, None)
except Exception as e:
self._logger.exception('Job {} failed: {}'.format(job.id, e))
self._exec_callback('failure', job, None, e, tb.format_exc())
else:
self._logger.info('Job {} returned: {}'.format(job.id, res))
self._exec_callback('success', job, res, None, None)
def pidwrapper(num):
print("Process {} starting".format(os.getpid()))
result = dowork(num)
print("Process {} ending".format(os.getpid()))
return result
if __name__ == "__main__":
# Sequential list for generating fibbonacci sequence
myList = range(30)
# Generates a pool of 30 workers
myPool = multiprocessing.Pool(processes=30)
# sets up and automatically starts a worker for each number
#output = pool.map(dowork, myList)
# sets up an automatically starts a worker for each number, returning results
# as they arrive
results = [myPool.apply_async(pidwrapper, (num,)) for num in myList]
# The get will raise an exception if the result is not ready. We can use
# this to check it and move on if the result is not ready.
done = False
visited = [0 for x in myList]
finalList = [0 for x in myList]
start = time.time()
while not done:
try:
for i in range(len(visited)):
if not visited[i]:
print("Fibonacci number: {}\n\tfinished in: {} seconds\n\tResult: {}".format(i, time.time()-start, results[i].get(timeout=1)))
visited[i] = 1
finalList[i] = results[i].get()
done = True
except multiprocessing.TimeoutError:
pass
# The result is still being computed, move on to something else.
print(finalList)
def get_cl_statuses(changes, fine_grained, max_processes=None):
"""Returns a blocking iterable of (cl, status) for given branches.
If fine_grained is true, this will fetch CL statuses from the server.
Otherwise, simply indicate if there's a matching url for the given branches.
If max_processes is specified, it is used as the maximum number of processes
to spawn to fetch CL status from the server. Otherwise 1 process per branch is
spawned.
See GetStatus() for a list of possible statuses.
"""
# Silence upload.py otherwise it becomes unwieldy.
upload.verbosity = 0
if fine_grained:
# Process one branch synchronously to work through authentication, then
# spawn processes to process all the other branches in parallel.
if changes:
def fetch(cl):
try:
return (cl, cl.GetStatus())
except:
# See http://crbug.com/629863.
logging.exception('failed to fetch status for %s:', cl)
raise
yield fetch(changes[0])
changes_to_fetch = changes[1:]
if not changes_to_fetch:
# Exit early if there was only one branch to fetch.
return
pool = ThreadPool(
min(max_processes, len(changes_to_fetch))
if max_processes is not None
else max(len(changes_to_fetch), 1))
fetched_cls = set()
it = pool.imap_unordered(fetch, changes_to_fetch).__iter__()
while True:
try:
row = it.next(timeout=5)
except multiprocessing.TimeoutError:
break
fetched_cls.add(row[0])
yield row
# Add any branches that failed to fetch.
for cl in set(changes_to_fetch) - fetched_cls:
yield (cl, 'error')
else:
# Do not use GetApprovingReviewers(), since it requires an HTTP request.
for cl in changes:
yield (cl, 'waiting' if cl.GetIssueURL() else 'error')
def run(self, result):
"""
Distribute test cases across workers.
Return an identifier of each test case with its result in order to use
imap_unordered to show results as soon as they're available.
To minimize pickling errors when getting results from workers:
- pass back numeric indexes in self.subsuites instead of tests
- make tracebacks picklable with tblib, if available
Even with tblib, errors may still occur for dynamically created
exception classes such Model.DoesNotExist which cannot be unpickled.
"""
counter = multiprocessing.Value(ctypes.c_int, 0)
pool = multiprocessing.Pool(
processes=self.processes,
initializer=self.init_worker.__func__,
initargs=[counter])
args = [
(self.runner_class, index, subsuite, self.failfast)
for index, subsuite in enumerate(self.subsuites)
]
test_results = pool.imap_unordered(self.run_subsuite.__func__, args)
while True:
if result.shouldStop:
pool.terminate()
break
try:
subsuite_index, events = test_results.next(timeout=0.1)
except multiprocessing.TimeoutError:
continue
except StopIteration:
pool.close()
break
tests = list(self.subsuites[subsuite_index])
for event in events:
event_name = event[0]
handler = getattr(result, event_name, None)
if handler is None:
continue
test = tests[event[1]]
args = event[2:]
handler(test, *args)
pool.join()
return result
def get_cl_statuses(changes, fine_grained, max_processes=None):
"""Returns a blocking iterable of (cl, status) for given branches.
If fine_grained is true, this will fetch CL statuses from the server.
Otherwise, simply indicate if there's a matching url for the given branches.
If max_processes is specified, it is used as the maximum number of processes
to spawn to fetch CL status from the server. Otherwise 1 process per branch is
spawned.
See GetStatus() for a list of possible statuses.
"""
# Silence upload.py otherwise it becomes unwieldy.
upload.verbosity = 0
if not changes:
raise StopIteration()
if not fine_grained:
# Fast path which doesn't involve querying codereview servers.
# Do not use GetApprovingReviewers(), since it requires an HTTP request.
for cl in changes:
yield (cl, 'waiting' if cl.GetIssueURL() else 'error')
return
# First, sort out authentication issues.
logging.debug('ensuring credentials exist')
for cl in changes:
cl.EnsureAuthenticated(force=False, refresh=True)
def fetch(cl):
try:
return (cl, cl.GetStatus())
except:
# See http://crbug.com/629863.
logging.exception('failed to fetch status for %s:', cl)
raise
threads_count = len(changes)
if max_processes:
threads_count = max(1, min(threads_count, max_processes))
logging.debug('querying %d CLs using %d threads', len(changes), threads_count)
pool = ThreadPool(threads_count)
fetched_cls = set()
try:
it = pool.imap_unordered(fetch, changes).__iter__()
while True:
try:
cl, status = it.next(timeout=5)
except multiprocessing.TimeoutError:
break
fetched_cls.add(cl)
yield cl, status
finally:
pool.close()
# Add any branches that failed to fetch.
for cl in set(changes) - fetched_cls:
yield (cl, 'error')
def run(self, result):
"""
Distribute test cases across workers.
Return an identifier of each test case with its result in order to use
imap_unordered to show results as soon as they're available.
To minimize pickling errors when getting results from workers:
- pass back numeric indexes in self.subsuites instead of tests
- make tracebacks picklable with tblib, if available
Even with tblib, errors may still occur for dynamically created
exception classes such Model.DoesNotExist which cannot be unpickled.
"""
if tblib is not None:
tblib.pickling_support.install()
counter = multiprocessing.Value(ctypes.c_int, 0)
pool = multiprocessing.Pool(
processes=self.processes,
initializer=self.init_worker.__func__,
initargs=[counter])
args = [
(index, subsuite, self.failfast)
for index, subsuite in enumerate(self.subsuites)
]
test_results = pool.imap_unordered(self.run_subsuite.__func__, args)
while True:
if result.shouldStop:
pool.terminate()
break
try:
subsuite_index, events = test_results.next(timeout=0.1)
except multiprocessing.TimeoutError:
continue
except StopIteration:
pool.close()
break
tests = list(self.subsuites[subsuite_index])
for event in events:
event_name = event[0]
handler = getattr(result, event_name, None)
if handler is None:
continue
test = tests[event[1]]
args = event[2:]
handler(test, *args)
pool.join()
return result
def main():
try:
checkpoint = json.load(open("checkpoint.factory.json"))
except:
checkpoint = {}
starttime = time.time()
pool = multiprocessing.Pool(processes=10)
future = pool.apply_async(get_schedds)
schedd_ads = future.get(TIMEOUT_MINS*60)
print "There are %d schedds to query." % len(schedd_ads)
futures = []
for schedd_ad in schedd_ads:
name = schedd_ad["Name"]
last_completion = checkpoint.get(name, 0)
future = pool.apply_async(process_schedd, (starttime, last_completion, schedd_ad))
futures.append((name, future))
pool.close()
timed_out = False
for name, future in futures:
time_remaining = TIMEOUT_MINS*60+10 - (time.time() - starttime)
if time_remaining > 0:
try:
last_completion = future.get(time_remaining)
if name:
checkpoint[schedd_ad["name"]] = last_completion
except multiprocessing.TimeoutError:
print "Schedd %s timed out; ignoring progress." % name
else:
timed_out = True
break
if timed_out:
pool.terminate()
pool.join()
try:
checkpoint_new = json.load(open("checkpoint.factory.json"))
except:
checkpoint_new = {}
for key, val in checkpoint.items():
if (key not in checkpoint_new) or (val > checkpoint_new[key]):
checkpoint_new[key] = val
fd, tmpname = tempfile.mkstemp(dir=".", prefix="checkpoint.factory.json.new")
fd = os.fdopen(fd, "w")
json.dump(checkpoint_new, fd)
fd.close()
os.rename(tmpname, "checkpoint.factory.json")
print "Total processing time: %.2f mins" % ((time.time()-starttime)/60.)
def main():
try:
checkpoint = json.load(open("checkpoint2.json"))
except:
checkpoint = {}
starttime = time.time()
pool = multiprocessing.Pool(processes=10)
future = pool.apply_async(get_schedds)
schedd_ads = future.get(TIMEOUT_MINS*60)
print "There are %d schedds to query." % len(schedd_ads)
futures = []
for schedd_ad in schedd_ads:
name = schedd_ad["Name"]
last_completion = checkpoint.get(name, 0)
future = pool.apply_async(process_schedd, (starttime, last_completion, schedd_ad))
futures.append((name, future))
timed_out = False
for name, future in futures:
time_remaining = TIMEOUT_MINS*60+10 - (time.time() - starttime)
if time_remaining > 0:
try:
last_completion = future.get(time_remaining)
checkpoint["name"] = last_completion
except multiprocessing.TimeoutError:
print "Schedd %s timed out; ignoring progress." % name
else:
timed_out = True
break
if timed_out:
pool.terminate()
else:
pool.close()
pool.join()
fd = open("checkpoint2.json.new", "w")
json.dump(checkpoint, fd)
fd.close()
os.rename("checkpoint2.json.new", "checkpoint2.json")
print "Total processing time: %.2f mins" % ((time.time()-starttime)/60.)
def main():
try:
checkpoint = json.load(open("checkpoint.json"))
except:
checkpoint = {}
starttime = time.time()
pool = multiprocessing.Pool(processes=10)
future = pool.apply_async(get_schedds)
schedd_ads = future.get(TIMEOUT_MINS*60)
print "There are %d schedds to query." % len(schedd_ads)
futures = []
for schedd_ad in schedd_ads:
name = schedd_ad["Name"]
#if name != "vocms0309.cern.ch": continue
last_completion = checkpoint.get(name, 0)
future = pool.apply_async(process_schedd, (starttime, last_completion, schedd_ad))
futures.append((name, future))
#break
pool.close()
timed_out = False
for name, future in futures:
time_remaining = TIMEOUT_MINS*60+10 - (time.time() - starttime)
if time_remaining > 0:
try:
last_completion = future.get(time_remaining)
checkpoint[schedd_ad["name"]] = last_completion
except multiprocessing.TimeoutError:
print "Schedd %s timed out; ignoring progress." % name
else:
timed_out = True
break
if timed_out:
pool.terminate()
pool.join()
try:
checkpoint_new = json.load(open("checkpoint.json"))
except:
checkpoint_new = {}
for key, val in checkpoint.items():
if (key not in checkpoint_new) or (val > checkpoint_new[key]):
checkpoint_new[key] = val
fd = open("checkpoint.json.new", "w")
json.dump(checkpoint_new, fd)
fd.close()
os.rename("checkpoint.json.new", "checkpoint.json")
print "Total processing time: %.2f mins" % ((time.time()-starttime)/60.)
def run(self, result):
"""
Distribute test cases across workers.
Return an identifier of each test case with its result in order to use
imap_unordered to show results as soon as they're available.
To minimize pickling errors when getting results from workers:
- pass back numeric indexes in self.subsuites instead of tests
- make tracebacks picklable with tblib, if available
Even with tblib, errors may still occur for dynamically created
exception classes such Model.DoesNotExist which cannot be unpickled.
"""
if tblib is not None:
tblib.pickling_support.install()
counter = multiprocessing.Value(ctypes.c_int, 0)
pool = multiprocessing.Pool(
processes=self.processes,
initializer=self.init_worker.__func__,
initargs=[counter])
args = [
(index, subsuite, self.failfast)
for index, subsuite in enumerate(self.subsuites)
]
test_results = pool.imap_unordered(self.run_subsuite.__func__, args)
while True:
if result.shouldStop:
pool.terminate()
break
try:
subsuite_index, events = test_results.next(timeout=0.1)
except multiprocessing.TimeoutError:
continue
except StopIteration:
pool.close()
break
tests = list(self.subsuites[subsuite_index])
for event in events:
event_name = event[0]
handler = getattr(result, event_name, None)
if handler is None:
continue
test = tests[event[1]]
args = event[2:]
handler(test, *args)
pool.join()
return result
def get_cl_statuses(changes, fine_grained, max_processes=None):
"""Returns a blocking iterable of (cl, status) for given branches.
If fine_grained is true, this will fetch CL statuses from the server.
Otherwise, simply indicate if there's a matching url for the given branches.
If max_processes is specified, it is used as the maximum number of processes
to spawn to fetch CL status from the server. Otherwise 1 process per branch is
spawned.
See GetStatus() for a list of possible statuses.
"""
# Silence upload.py otherwise it becomes unwieldy.
upload.verbosity = 0
if not changes:
raise StopIteration()
if not fine_grained:
# Fast path which doesn't involve querying codereview servers.
# Do not use get_approving_reviewers(), since it requires an HTTP request.
for cl in changes:
yield (cl, 'waiting' if cl.GetIssueURL() else 'error')
return
# First, sort out authentication issues.
logging.debug('ensuring credentials exist')
for cl in changes:
cl.EnsureAuthenticated(force=False, refresh=True)
def fetch(cl):
try:
return (cl, cl.GetStatus())
except:
# See http://crbug.com/629863.
logging.exception('failed to fetch status for %s:', cl)
raise
threads_count = len(changes)
if max_processes:
threads_count = max(1, min(threads_count, max_processes))
logging.debug('querying %d CLs using %d threads', len(changes), threads_count)
pool = ThreadPool(threads_count)
fetched_cls = set()
try:
it = pool.imap_unordered(fetch, changes).__iter__()
while True:
try:
cl, status = it.next(timeout=5)
except multiprocessing.TimeoutError:
break
fetched_cls.add(cl)
yield cl, status
finally:
pool.close()
# Add any branches that failed to fetch.
for cl in set(changes) - fetched_cls:
yield (cl, 'error')
def map(self, func, iterable, chunksize=None, callback=None):
"""
Equivalent to the built-in ``map()`` function and
:meth:`multiprocessing.pool.Pool.map()`, without catching
``KeyboardInterrupt``.
Parameters
----------
worker : callable
A function or callable object that is executed on each element of
the specified ``tasks`` iterable. This object must be picklable
(i.e. it can't be a function scoped within a function or a
``lambda`` function). This should accept a single positional
argument and return a single object.
tasks : iterable
A list or iterable of tasks. Each task can be itself an iterable
(e.g., tuple) of values or data to pass in to the worker function.
callback : callable, optional
An optional callback function (or callable) that is called with the
result from each worker run and is executed on the master process.
This is useful for, e.g., saving results to a file, since the
callback is only called on the master thread.
Returns
-------
results : list
A list of results from the output of each ``worker()`` call.
"""
if callback is None:
callbackwrapper = None
else:
callbackwrapper = CallbackWrapper(callback)
# The key magic is that we must call r.get() with a timeout, because
# a Condition.wait() without a timeout swallows KeyboardInterrupts.
r = self.map_async(func, iterable, chunksize=chunksize,
callback=callbackwrapper)
while True:
try:
return r.get(self.wait_timeout)
except multiprocessing.TimeoutError:
pass
except KeyboardInterrupt:
self.terminate()
self.join()
raise