elastic.py 文件源码

python
阅读 141 收藏 0 点赞 0 评论 0

项目:browbeat 作者: openstack 项目源码 文件源码
def flush_cache(self):
        if len(self.cache) == 0:
            return True
        retry = 2
        for i in range(retry):
            try:
                to_upload = helpers.parallel_bulk(self.es,
                                                  self.cache_insertable_iterable())
                counter = 0
                num_items = len(self.cache)
                for item in to_upload:
                    self.logger.debug("{} of {} Elastic objects uploaded".format(num_items,
                                                                                 counter))
                    counter = counter + 1
                output = "Pushed {} items to Elasticsearch to index {}".format(num_items,
                                                                               self.index)
                output += " and browbeat UUID {}".format(str(browbeat_uuid))
                self.logger.info(output)
                self.cache = deque()
                self.last_upload = datetime.datetime.utcnow()
                return True
            except Exception as Err:
                self.logger.error(
                    "Error pushing data to Elasticsearch, going to retry"
                    " in 10 seconds")
                self.logger.error("Exception: {}".format(Err))
                time.sleep(10)
                if i == (retry - 1):
                    self.logger.error("Pushing Data to Elasticsearch failed in spite of retry,"
                                      " dumping JSON for {} cached items".format(len(self.cache)))
                    for item in self.cache:
                        filename = item['test_name'] + '-' + item['identifier']
                        filename += '-elastic' + '.' + 'json'
                        elastic_file = os.path.join(item['result_dir'],
                                                    filename)

                        with open(elastic_file, 'w') as result_file:
                            json.dump(item['result'],
                                      result_file,
                                      indent=4,
                                      sort_keys=True)

                            self.logger.info("Saved Elasticsearch consumable result JSON to {}".
                                             format(elastic_file))
                    self.cache = deque()
                    self.last_upload = datetime.datetime.utcnow()
                    return False
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号