def single_bulk_to_es(bulk, config, attempt_retry):
bulk = bulk_builder(bulk, config)
max_attempt = 1
if attempt_retry:
max_attempt += 3
for attempt in range(1, max_attempt+1):
try:
helpers.bulk(config['es_conn'], bulk, chunk_size=config['bulk_size'])
except Exception as e:
if attempt < max_attempt:
wait_seconds = attempt*3
log('warn', 'attempt [%s/%s] got exception, will retry after %s seconds' % (attempt,max_attempt,wait_seconds) )
time.sleep(wait_seconds)
continue
log('error', 'attempt [%s/%s] got exception, it is a permanent data loss, no retry any more' % (attempt,max_attempt) )
raise e
if attempt > 1:
log('info', 'attempt [%s/%s] succeed. we just get recovered from previous error' % (attempt,max_attempt) )
# completed succesfully
break
评论列表
文章目录