def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
max_chunk_bytes=100 * 1024 * 1024,
expand_action_callback=es_helpers.expand_action,
**kwargs):
""" es_helpers.parallel_bulk rewritten with imap_fixed_output_buffer
instead of Pool.imap, which consumed unbounded memory if the generator
outruns the upload (which usually happens).
"""
actions = map(expand_action_callback, actions)
for result in imap_fixed_output_buffer(
lambda chunk: list(
es_helpers._process_bulk_chunk(client, chunk, **kwargs)),
es_helpers._chunk_actions(actions, chunk_size, max_chunk_bytes,
client.transport.serializer),
threads=thread_count,
):
for item in result:
yield item
评论列表
文章目录