def query_shards(self, query):
"""
Return the result of applying shard[query] for each shard in self.shards,
as a sequence.
If PARALLEL_SHARDS is set, the shards are queried in parallel, using
the multiprocessing module.
"""
args = zip([query] * len(self.shards), self.shards)
if PARALLEL_SHARDS and PARALLEL_SHARDS > 1:
logger.debug("spawning %i query processes" % PARALLEL_SHARDS)
pool = multiprocessing.Pool(PARALLEL_SHARDS)
result = pool.imap(query_shard, args, chunksize=1 + len(args) / PARALLEL_SHARDS)
else:
# serial processing, one shard after another
pool = None
result = imap(query_shard, args)
return pool, result
评论列表
文章目录