def _commit(self):
bulk = []
stop = False
while True:
while len(bulk) < 50 and not stop:
try:
obj = self.elastic_bulk.get(timeout=3)
except queue.Empty:
break
if obj is None:
stop = True
else:
bulk.append(obj)
if bulk:
try:
self.helper.bulk(self.elastic, bulk)
except Exception as err:
LOGGER.exception('es index error: %s', err)
bulk = []
if stop:
break
评论列表
文章目录