def invoke_mappers(n_mappers, batches):
mapper_outputs = []
logger.info("# of Mappers {}".format(n_mappers))
pool = ThreadPool(n_mappers)
mapper_ids = [i + 1 for i in range(n_mappers)]
invoke_lambda_partial = partial(invoke_lambda,
batches,
mapper_outputs,
mapper_lambda_name)
mappers_executed = 0
while mappers_executed < n_mappers:
nm = min(PARALLEL_LAMBDAS, n_mappers)
results = pool.map(invoke_lambda_partial,
mapper_ids[mappers_executed: mappers_executed + nm])
mappers_executed += nm
pool.close()
pool.join()
logger.info("All the mappers finished")
评论列表
文章目录