def flush_cache(self):
if len(self.cache) == 0:
return True
retry = 2
for i in range(retry):
try:
to_upload = helpers.parallel_bulk(self.es,
self.cache_insertable_iterable())
counter = 0
num_items = len(self.cache)
for item in to_upload:
self.logger.debug("{} of {} Elastic objects uploaded".format(num_items,
counter))
counter = counter + 1
output = "Pushed {} items to Elasticsearch to index {}".format(num_items,
self.index)
output += " and browbeat UUID {}".format(str(browbeat_uuid))
self.logger.info(output)
self.cache = deque()
self.last_upload = datetime.datetime.utcnow()
return True
except Exception as Err:
self.logger.error(
"Error pushing data to Elasticsearch, going to retry"
" in 10 seconds")
self.logger.error("Exception: {}".format(Err))
time.sleep(10)
if i == (retry - 1):
self.logger.error("Pushing Data to Elasticsearch failed in spite of retry,"
" dumping JSON for {} cached items".format(len(self.cache)))
for item in self.cache:
filename = item['test_name'] + '-' + item['identifier']
filename += '-elastic' + '.' + 'json'
elastic_file = os.path.join(item['result_dir'],
filename)
with open(elastic_file, 'w') as result_file:
json.dump(item['result'],
result_file,
indent=4,
sort_keys=True)
self.logger.info("Saved Elasticsearch consumable result JSON to {}".
format(elastic_file))
self.cache = deque()
self.last_upload = datetime.datetime.utcnow()
return False
评论列表
文章目录