def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
max_chunk_bytes=100 * 1024 * 1024,
expand_action_callback=es_helpers.expand_action,
**kwargs):
""" es_helpers.parallel_bulk rewritten with imap_fixed_output_buffer
instead of Pool.imap, which consumed unbounded memory if the generator
outruns the upload (which usually happens).
"""
actions = map(expand_action_callback, actions)
for result in imap_fixed_output_buffer(
lambda chunk: list(
es_helpers._process_bulk_chunk(client, chunk, **kwargs)),
es_helpers._chunk_actions(actions, chunk_size, max_chunk_bytes,
client.transport.serializer),
threads=thread_count,
):
for item in result:
yield item
python类parallel_bulk()的实例源码
def bulk(self, client, actions, stats_only=False, **kwargs):
success, failed = 0, 0
# list of errors to be collected is not stats_only
errors = []
for ok, item in parallel_bulk(client, actions, **kwargs):
# go through request-reponse pairs and detect failures
if not ok:
if not stats_only:
errors.append(item)
failed += 1
else:
success += 1
return success, failed if stats_only else errors
def parallel_bulk(self,
the_iter,
*args,
**kw):
if self.use_custom_parallel_bulk:
return self._non_parallel_bulk(the_iter,
*args,
**kw)
else:
return es_parallel_bulk(self.es,
the_iter,
*args,
**kw)
def _perform_index_sync(self, sql_table_cls, es_doc_cls, id_logger):
es_doc = es_doc_cls()
elasticsearch_conn = connections.get_connection()
sync_timestamp = current_server_timestamp()
pending_insertions = self._compute_dirty_documents(
sql_table_cls, es_doc.doc_type)
bulk_op = self._synchronisation_op(es_doc, pending_insertions)
self._logging(logging.INFO, 'Performing synchronization.')
for ok, info in parallel_bulk(elasticsearch_conn, bulk_op):
obj_id = info['index']['_id'] \
if 'index' in info else info['update']['_id']
if ok:
# Mark the task as handled so we don't retreat it next time
self._logging(logging.INFO,
'Document %s has been synced successfully.'
% obj_id)
sql_table_cls.update_last_sync(obj_id, sync_timestamp)
else:
id_logger(obj_id, logging.ERROR,
'Error while syncing document %s index.' % obj_id)
# Refresh indices to increase research speed
elasticsearch_dsl.Index(es_doc.index).refresh()
def _geocomplete_index_batch(self, elasticsearch_conn, to_index):
log_msg = 'Indexing documents.'
self._logging(logging.INFO, log_msg)
for ok, info in parallel_bulk(elasticsearch_conn, to_index):
if not ok:
doc_id = info['create']['_id']
doc_type = info['create']['_type']
doc_index = info['create']['_index']
logging_level = logging.ERROR
err_msg = "Couldn't index document: '%s', of type: %s, " \
"under index: %s." % (doc_id, doc_type, doc_index)
self._logging(logging_level, err_msg)
def run(self):
# Chequeo si existe el índice, si no, lo creo
if not self.elastic.indices.exists(settings.TEST_INDEX):
self.elastic.indices.create(settings.TEST_INDEX,
body=INDEX_CREATION_BODY)
for interval in COLLAPSE_INTERVALS:
self.init_series(interval)
for success, info in parallel_bulk(self.elastic, self.bulk_items):
if not success:
print("ERROR:", info)
def run(self, distributions=None):
"""Indexa en Elasticsearch todos los datos de las
distribuciones guardadas en la base de datos, o las
especificadas por el iterable 'distributions'
"""
self.init_index()
# Optimización: Desactivo el refresh de los datos mientras indexo
self.elastic.indices.put_settings(
index=self.index,
body=constants.DEACTIVATE_REFRESH_BODY
)
logger.info(strings.INDEX_START)
for distribution in distributions:
fields = distribution.field_set.all()
fields = {field.title: field.series_id for field in fields}
df = self.init_df(distribution, fields)
self.generate_properties(df, fields)
logger.info(strings.BULK_REQUEST_START)
for success, info in parallel_bulk(self.elastic, self.bulk_actions):
if not success:
logger.warn(strings.BULK_REQUEST_ERROR, info)
logger.info(strings.BULK_REQUEST_END)
# Reactivo el proceso de replicado una vez finalizado
self.elastic.indices.put_settings(
index=self.index,
body=constants.REACTIVATE_REFRESH_BODY
)
segments = constants.FORCE_MERGE_SEGMENTS
self.elastic.indices.forcemerge(index=self.index,
max_num_segments=segments)
logger.info(strings.INDEX_END)
def pump_it(es, bulk_accumulator):
rows_pumped = 0
# TODO: make threads and chunks configurable
for success, info in parallel_bulk(es, bulk_accumulator, thread_count=16, chunk_size=300):
if success:
rows_pumped += 1
else:
logger.warning('Pumping documents failed: {}'.format(info))
return rows_pumped
def flush_cache(self):
if len(self.cache) == 0:
return True
retry = 2
for i in range(retry):
try:
to_upload = helpers.parallel_bulk(self.es,
self.cache_insertable_iterable())
counter = 0
num_items = len(self.cache)
for item in to_upload:
self.logger.debug("{} of {} Elastic objects uploaded".format(num_items,
counter))
counter = counter + 1
output = "Pushed {} items to Elasticsearch to index {}".format(num_items,
self.index)
output += " and browbeat UUID {}".format(str(browbeat_uuid))
self.logger.info(output)
self.cache = deque()
self.last_upload = datetime.datetime.utcnow()
return True
except Exception as Err:
self.logger.error(
"Error pushing data to Elasticsearch, going to retry"
" in 10 seconds")
self.logger.error("Exception: {}".format(Err))
time.sleep(10)
if i == (retry - 1):
self.logger.error("Pushing Data to Elasticsearch failed in spite of retry,"
" dumping JSON for {} cached items".format(len(self.cache)))
for item in self.cache:
filename = item['test_name'] + '-' + item['identifier']
filename += '-elastic' + '.' + 'json'
elastic_file = os.path.join(item['result_dir'],
filename)
with open(elastic_file, 'w') as result_file:
json.dump(item['result'],
result_file,
indent=4,
sort_keys=True)
self.logger.info("Saved Elasticsearch consumable result JSON to {}".
format(elastic_file))
self.cache = deque()
self.last_upload = datetime.datetime.utcnow()
return False