def upload_documents(self, collection_id, corpus, max_concurrent_child_processes=20):
"""
:param str collection_id: collection to upload to
:param Iterable corpus: an iterable which yields (doc_id, doc_as_json)
:param int max_concurrent_child_processes: the maximum number of concurrent processes that are spawned
to help parrallelize the document upload requests
"""
stats = defaultdict(int)
# Setup manager so we can do multiprocessing to speed things up
file_processors = list()
manager = Manager()
response_from_processors = manager.dict()
for doc_id, body in corpus:
stats['num_docs'] += 1
self._wait_for_processors_to_free_up(max_concurrent_child_processes)
file_processors.append(Process(target=upload_file_to_discovery_collection,
args=(self.config, self.environment_id, collection_id, doc_id, body,
response_from_processors)))
file_processors[-1].start()
if self.logger.isEnabledFor(logging.DEBUG) or stats['num_docs'] % 1000 == 0:
self.logger.info('Submitted %d upload requests' % stats['num_docs'])
stats['num_requests_submitted'] += 1
self.logger.info('Done submitted requests, checking up on the status of the requests')
# check for failures
stats['counts_by_status'] = self._check_file_processes(file_processors, response_from_processors)
self.logger.info('Processed %d docs' % stats['num_docs'])
json.dump(stats, sys.stdout, sort_keys=True, indent=4)
discovery_wrappers.py 文件源码
python
阅读 27
收藏 0
点赞 0
评论 0
评论列表
文章目录