def run(config_uri, app_name=None, username=None, types=(), batch_size=500, processes=None):
# multiprocessing.get_context is Python 3 only.
from multiprocessing import get_context
from multiprocessing.pool import Pool
# Loading app will have configured from config file. Reconfigure here:
logging.getLogger('snovault').setLevel(logging.DEBUG)
testapp = internal_app(config_uri, app_name, username)
connection = testapp.app.registry[CONNECTION]
uuids = [str(uuid) for uuid in connection.__iter__(*types)]
transaction.abort()
logger.info('Total items: %d' % len(uuids))
pool = Pool(
processes=processes,
initializer=initializer,
initargs=(config_uri, app_name, username),
context=get_context('forkserver'),
)
all_results = []
try:
for result in pool.imap_unordered(worker, batched(uuids, batch_size), chunksize=1):
results = result['results']
errors = sum(error for item_type, path, update, error in results)
updated = sum(update for item_type, path, update, error in results)
logger.info('Batch: Updated %d of %d (errors %d)' %
(updated, len(results), errors))
all_results.extend(results)
finally:
pool.terminate()
pool.join()
def result_item_type(result):
# Ensure we always return a string
return result[0] or ''
for item_type, results in itertools.groupby(
sorted(all_results, key=result_item_type), key=result_item_type):
results = list(results)
errors = sum(error for item_type, path, update, error in results)
updated = sum(update for item_type, path, update, error in results)
logger.info('Collection %s: Updated %d of %d (errors %d)' %
(item_type, updated, len(results), errors))
评论列表
文章目录