def save_availability(results_queue):
"""Send availability data to storage backend.
:param results_queue: queue.Queue which provides data to save
:rtype: None
"""
results = []
timeout = 3
while True:
try:
data = results_queue.get(True, timeout=timeout)
except queue.Empty:
break
results.append(data)
body = []
indices = set()
for data in results:
index = "ms_availability_%(region)s" % data
metadata = {"index": {"_index": index,
"_type": "service_availability",
"_id": str(uuid.uuid1())}}
body.append(json.dumps(metadata, indent=0).replace("\n", ""))
body.append("\n")
body.append(json.dumps(data, indent=0).replace("\n", ""))
body.append("\n")
if index not in indices:
storage.ensure_es_index_exists(index)
indices.add(index)
body = "".join(body)
es = storage.get_elasticsearch()
LOG.debug("Saving availability:\n%s" % body)
try:
es.bulk(body=body)
except es_exceptions.ElasticsearchException as e:
LOG.error("Failed to save availability to Elastic:\n"
"Body: %s\nError: %s" % (body, e))
评论列表
文章目录