def commit_logs(self):
"""
Periodically called (commit_period), this method prepares a bunch of queued logs (commit_colume) to insert them in the index
"""
if not self.logs_cache:
return
if not self.is_connected == CONNECTED:
if not self.open():
logger.warning("[elastic-logs] log commiting failed")
logger.warning("[elastic-logs] %d lines to insert in the index", len(self.logs_cache))
return
logger.debug("[elastic-logs] commiting ...")
logger.debug("[elastic-logs] %d lines to insert in the index (max insertion is %d lines)", len(self.logs_cache), self.commit_volume)
# Flush all the stored log lines
logs_to_commit = 1
now = time.time()
some_logs = []
while True:
try:
# result = self.db[self.logs_collection].insert_one(self.logs_cache.popleft())
some_logs.append(self.logs_cache.popleft())
logs_to_commit = logs_to_commit + 1
if logs_to_commit >= self.commit_volume:
break
except IndexError:
logger.debug("[elastic-logs] prepared all available logs for commit")
break
except Exception, exp:
logger.error("[elastic-logs] exception: %s", str(exp))
logger.debug("[elastic-logs] time to prepare %s logs for commit (%2.4f)", logs_to_commit, time.time() - now)
now = time.time()
try:
# Insert lines to commit
result = helpers.bulk(self.es,some_logs,self.commit_volume)
logger.debug("[elastic-logs] inserted %d logs.", result)
except ElasticsearchException, exp:
self.close()
logger.error("[elastic-logs] Error occurred when commiting: %s", exp)
logger.debug("[elastic-logs] time to insert %s logs (%2.4f)", logs_to_commit, time.time() - now)
评论列表
文章目录