def run_executables(execalls, cache=None, ccmode=CC_PROCESSES):
# type: (List[ExeCall], Optional[Cache], str) -> Iterator[ExeResult]
"""Run executables in parallel.
Some of the results for the execalls may be found in the cache
so we put these aside in cachedresults.
Each result is yield as soon as available.
"""
def c2exeresult(value):
# type: (bytes) -> ExeResult
returncode, stdout, stderr = unpack_exeresult(value)
return make_exeresult(returncode, stdout, stderr)
def exeresult2c(exeresult):
# type: (ExeResult) -> bytes
return pack_exeresult(exeresult.returncode, exeresult.stdout, exeresult.stderr)
# Package the execalls for eventuall multiprocessing
args_lists = [((ec.exe, ec.cmdargs), {'stdindata': ec.stdindata})
for ec in execalls] # type: List[CallArgs]
cachedresults = []
jobs = [] # type: List[CallArgs]
keys = [] # type: List[str]
jobindices = [] # type: List[int]
if cache is not None:
qkeys = [execall_hash(ec, cache) for ec in execalls]
qresults = cache.mget(qkeys)
for idx, (arg, key, cvalue) in enumerate(izip(args_lists, qkeys, qresults)):
if cvalue is not None:
cachedresults.append((idx, c2exeresult(cvalue)))
else:
keys.append(key)
jobs.append(arg)
jobindices.append(idx)
else:
jobs = args_lists
jobindices = list(range(len(jobs)))
jobiter = iter_parallel(call_popen, jobs, ccmode=ccmode)
def jobresultiter():
# type: () -> Iterator[Tuple[int, ExeResult]]
for idx, (jobidx, job, result) in enumerate(izip(jobindices, jobs, jobiter)):
if cache is not None:
cache.set(keys[idx], exeresult2c(result))
(executable, cmdargs), kwargs = job
log_popen(executable, cmdargs, kwargs['stdindata'], result)
yield jobidx, result
for idx, result in heapq.merge(iter(cachedresults), jobresultiter()):
yield result
评论列表
文章目录