def send_to_es(self, path, method="GET", payload={}):
"""Low-level POST data to Amazon Elasticsearch Service generating a Sigv4 signed request
Args:
path (str): path to send to ES
method (str, optional): HTTP method default:GET
payload (dict, optional): additional payload used during POST or PUT
Returns:
dict: json answer converted in dict
Raises:
#: Error during ES communication
ES_Exception: Description
"""
if not path.startswith("/"):
path = "/" + path
es_region = self.cfg["es_endpoint"].split(".")[1]
# send to ES with exponential backoff
retries = 0
while retries < int(self.cfg["es_max_retry"]):
if retries > 0:
seconds = (2**retries) * .1
# print('Waiting for %.1f seconds', seconds)
time.sleep(seconds)
req = AWSRequest(
method=method,
url="https://%s%s?pretty&format=json" % (self.cfg["es_endpoint"], quote(path)),
data=payload,
headers={'Host': self.cfg["es_endpoint"]})
credential_resolver = create_credential_resolver(get_session())
credentials = credential_resolver.load_credentials()
SigV4Auth(credentials, 'es', es_region).add_auth(req)
try:
preq = req.prepare()
session = Session()
res = session.send(preq)
if res.status_code >= 200 and res.status_code <= 299:
# print("%s %s" % (res.status_code, res.content))
return json.loads(res.content)
else:
raise ES_Exception(res.status_code, res._content)
except ES_Exception as e:
if (e.status_code >= 500) and (e.status_code <= 599):
retries += 1 # Candidate for retry
else:
raise # Stop retrying, re-raise exception
评论列表
文章目录