def get_index_start(self, index, timestamp_field='@timestamp'):
""" Query for one result sorted by timestamp to find the beginning of the index.
:param index: The index of which to find the earliest event.
:return: Timestamp of the earliest event.
"""
query = {'sort': {timestamp_field: {'order': 'asc'}}}
try:
res = self.current_es.search(index=index, size=1, body=query, _source_include=[timestamp_field], ignore_unavailable=True)
except ElasticsearchException as e:
self.handle_error("Elasticsearch query error: %s" % (e), {'index': index})
return '1969-12-30T00:00:00Z'
if len(res['hits']['hits']) == 0:
# Index is completely empty, return a date before the epoch
return '1969-12-30T00:00:00Z'
return res['hits']['hits'][0][timestamp_field]
python类ElasticsearchException()的实例源码
def get_starttime(self, rule):
""" Query ES for the last time we ran this rule.
:param rule: The rule configuration.
:return: A timestamp or None.
"""
query = {'filter': {'term': {'rule_name': '%s' % (rule['name'])}},
'sort': {'@timestamp': {'order': 'desc'}}}
try:
if self.writeback_es:
res = self.writeback_es.search(index=self.writeback_index, doc_type='elastalert_status',
size=1, body=query, _source_include=['endtime', 'rule_name'])
if res['hits']['hits']:
endtime = ts_to_dt(res['hits']['hits'][0]['_source']['endtime'])
if ts_now() - endtime < self.old_query_limit:
return endtime
else:
elastalert_logger.info("Found expired previous run for %s at %s" % (rule['name'], endtime))
return None
except (ElasticsearchException, KeyError) as e:
self.handle_error('Error querying for last run: %s' % (e), {'rule': rule['name']})
self.writeback_es = None
def get_aggregated_matches(self, _id):
""" Removes and returns all matches from writeback_es that have aggregate_id == _id """
# XXX if there are more than self.max_aggregation matches, you have big alerts and we will leave entries in ES.
query = {'query': {'query_string': {'query': 'aggregate_id:%s' % (_id)}}, 'sort': {'@timestamp': 'asc'}}
matches = []
if self.writeback_es:
try:
res = self.writeback_es.search(index=self.writeback_index,
doc_type='elastalert',
body=query,
size=self.max_aggregation)
for match in res['hits']['hits']:
matches.append(match['_source'])
self.writeback_es.delete(index=self.writeback_index,
doc_type='elastalert',
id=match['_id'])
except (KeyError, ElasticsearchException) as e:
self.handle_error("Error fetching aggregated matches: %s" % (e), {'id': _id})
return matches
def find_pending_aggregate_alert(self, rule, aggregation_key_value=None):
query = {'filter': {'bool': {'must': [{'term': {'rule_name': rule['name']}},
{'range': {'alert_time': {'gt': ts_now()}}},
{'not': {'exists': {'field': 'aggregate_id'}}},
{'term': {'alert_sent': 'false'}}]}},
'sort': {'alert_time': {'order': 'desc'}}}
if aggregation_key_value:
query['filter']['bool']['must'].append({'term': {'aggregate_key': aggregation_key_value}})
if not self.writeback_es:
self.writeback_es = elasticsearch_client(self.conf)
try:
res = self.writeback_es.search(index=self.writeback_index,
doc_type='elastalert',
body=query,
size=1)
if len(res['hits']['hits']) == 0:
return None
except (KeyError, ElasticsearchException) as e:
self.handle_error("Error searching for pending aggregated matches: %s" % (e), {'rule_name': rule['name']})
return None
return res['hits']['hits'][0]
elastic2_doc_manager.py 文件源码
项目:elastic2-doc-manager
作者: mongodb-labs
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def send_buffered_operations(self):
"""Send buffered operations to Elasticsearch.
This method is periodically called by the AutoCommitThread.
"""
with self.lock:
try:
action_buffer = self.BulkBuffer.get_buffer()
if action_buffer:
successes, errors = bulk(self.elastic, action_buffer)
LOG.debug("Bulk request finished, successfully sent %d "
"operations", successes)
if errors:
LOG.error(
"Bulk request finished with errors: %r", errors)
except es_exceptions.ElasticsearchException:
LOG.exception("Bulk request failed with exception")
def send_buffered_operations(self):
"""Send buffered operations to Elasticsearch.
This method is periodically called by the AutoCommitThread.
"""
with self.lock:
try:
action_buffer = self.BulkBuffer.get_buffer()
if action_buffer:
successes, errors = bulk(self.elastic, action_buffer)
LOG.debug("Bulk request finished, successfully sent %d "
"operations", successes)
if errors:
LOG.error(
"Bulk request finished with errors: %r", errors)
except es_exceptions.ElasticsearchException:
LOG.exception("Bulk request failed with exception")
def _create_index_if_missing(self):
try:
if not self._es.indices.exists(self._settings['INDEX_NAME']):
self._es.indices.create(self._settings['INDEX_NAME'])
except ElasticsearchException as e:
self._log_error(e)
def _es_call(self, cmd, *args, **kwargs):
try:
return getattr(self._es, cmd)(*args, **kwargs)
except ElasticsearchException as e:
self._log_error(e)
return None
def save_availability(results_queue):
"""Send availability data to storage backend.
:param results_queue: queue.Queue which provides data to save
:rtype: None
"""
results = []
timeout = 3
while True:
try:
data = results_queue.get(True, timeout=timeout)
except queue.Empty:
break
results.append(data)
body = []
indices = set()
for data in results:
index = "ms_availability_%(region)s" % data
metadata = {"index": {"_index": index,
"_type": "service_availability",
"_id": str(uuid.uuid1())}}
body.append(json.dumps(metadata, indent=0).replace("\n", ""))
body.append("\n")
body.append(json.dumps(data, indent=0).replace("\n", ""))
body.append("\n")
if index not in indices:
storage.ensure_es_index_exists(index)
indices.add(index)
body = "".join(body)
es = storage.get_elasticsearch()
LOG.debug("Saving availability:\n%s" % body)
try:
es.bulk(body=body)
except es_exceptions.ElasticsearchException as e:
LOG.error("Failed to save availability to Elastic:\n"
"Body: %s\nError: %s" % (body, e))
def get_hits_count(self, rule, starttime, endtime, index):
""" Query Elasticsearch for the count of results and returns a list of timestamps
equal to the endtime. This allows the results to be passed to rules which expect
an object for each hit.
:param rule: The rule configuration dictionary.
:param starttime: The earliest time to query.
:param endtime: The latest time to query.
:return: A dictionary mapping timestamps to number of hits for that time period.
"""
query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'])
try:
res = self.current_es.count(index=index, doc_type=rule['doc_type'], body=query, ignore_unavailable=True)
except ElasticsearchException as e:
# Elasticsearch sometimes gives us GIGANTIC error messages
# (so big that they will fill the entire terminal buffer)
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running count query: %s' % (e), {'rule': rule['name']})
return None
self.num_hits += res['count']
lt = rule.get('use_local_time')
elastalert_logger.info("Queried rule %s from %s to %s: %s hits" % (rule['name'], pretty_ts(starttime, lt), pretty_ts(endtime, lt), res['count']))
return {endtime: res['count']}
def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=None):
rule_filter = copy.copy(rule['filter'])
if qk:
filter_key = rule['query_key']
if rule.get('raw_count_keys', True) and not rule['query_key'].endswith('.raw'):
filter_key = add_raw_postfix(filter_key)
rule_filter.extend([{'term': {filter_key: qk}}])
base_query = self.get_query(rule_filter, starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'])
if size is None:
size = rule.get('terms_size', 50)
query = self.get_terms_query(base_query, size, key)
try:
res = self.current_es.search(index=index, doc_type=rule['doc_type'], body=query, search_type='count', ignore_unavailable=True)
except ElasticsearchException as e:
# Elasticsearch sometimes gives us GIGANTIC error messages
# (so big that they will fill the entire terminal buffer)
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running query: %s' % (e), {'rule': rule['name']})
return None
if 'aggregations' not in res:
return {}
buckets = res['aggregations']['filtered']['counts']['buckets']
self.num_hits += len(buckets)
lt = rule.get('use_local_time')
elastalert_logger.info('Queried rule %s from %s to %s: %s buckets' % (rule['name'], pretty_ts(starttime, lt), pretty_ts(endtime, lt), len(buckets)))
return {endtime: buckets}
def writeback(self, doc_type, body):
# ES 2.0 - 2.3 does not support dots in field names.
if self.replace_dots_in_field_names:
writeback_body = replace_dots_in_field_names(body)
else:
writeback_body = body
for key in writeback_body.keys():
# Convert any datetime objects to timestamps
if isinstance(writeback_body[key], datetime.datetime):
writeback_body[key] = dt_to_ts(writeback_body[key])
if self.debug:
elastalert_logger.info("Skipping writing to ES: %s" % (writeback_body))
return None
if '@timestamp' not in writeback_body:
writeback_body['@timestamp'] = dt_to_ts(ts_now())
if self.writeback_es:
try:
res = self.writeback_es.create(index=self.writeback_index,
doc_type=doc_type, body=writeback_body)
return res
except ElasticsearchException as e:
logging.exception("Error writing alert info to Elasticsearch: %s" % (e))
self.writeback_es = None
def is_silenced(self, rule_name):
""" Checks if rule_name is currently silenced. Returns false on exception. """
if rule_name in self.silence_cache:
if ts_now() < self.silence_cache[rule_name][0]:
return True
else:
return False
if self.debug:
return False
query = {'filter': {'term': {'rule_name': rule_name}},
'sort': {'until': {'order': 'desc'}}}
if self.writeback_es:
try:
res = self.writeback_es.search(index=self.writeback_index, doc_type='silence',
size=1, body=query, _source_include=['until', 'exponent'])
except ElasticsearchException as e:
self.handle_error("Error while querying for alert silence status: %s" % (e), {'rule': rule_name})
return False
if res['hits']['hits']:
until_ts = res['hits']['hits'][0]['_source']['until']
exponent = res['hits']['hits'][0]['_source'].get('exponent', 0)
self.silence_cache[rule_name] = (ts_to_dt(until_ts), exponent)
if ts_now() < ts_to_dt(until_ts):
return True
return False
def healthcheck(request):
index = request.registry.settings['elasticsearch_index']
try:
status = request.es.cluster.health(index=index)['status']
except exceptions.ElasticsearchException as exc:
raise FailedHealthcheck('elasticsearch exception') from exc
if status not in ('yellow', 'green'):
raise FailedHealthcheck('cluster status was {!r}'.format(status))
return {'status': 'ok', 'version': bouncer_version}
twitter_to_es.py 文件源码
项目:ankaracloudmeetup-bigdata-demo
作者: serkan-ozal
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def save(tweets, es_host, es_port):
es = Elasticsearch(host = es_host, port = es_port)
print('Saving tweets into ElasticSearch on {}...'.format(es_host))
if es.indices.exists(index_name):
print ('Index {} already exists'.format(index_name))
try:
es.indices.put_mapping(doc_type, tweet_mapping, index_name)
except ElasticsearchException as e:
print('Error while putting mapping:\n' + str(e))
print('Deleting index {} on...'.format(index_name))
es.indices.delete(index_name)
print('Creating index {}...'.format(index_name))
es.indices.create(index_name, body = {'mappings': mapping})
else:
print('Index {} does not exist'.format(index_name))
print('Creating index {}...'.format(index_name))
es.indices.create(index_name, body = {'mappings': mapping})
counter = 0
bulk_data = []
list_size = len(tweets)
for doc in tweets:
tweet = analyze_and_get_tweet(doc)
bulk_doc = {
"_index": index_name,
"_type": doc_type,
"_id": tweet[id_field],
"_source": tweet
}
bulk_data.append(bulk_doc)
counter += 1
if counter % bulk_chunk_size == 0 or counter == list_size:
print('ElasticSearch bulk index (index: {INDEX}, type: {TYPE})...'.format(INDEX=index_name, TYPE=doc_type))
success, _ = bulk(es, bulk_data)
print 'ElasticSearch has indexed %d documents' % success
bulk_data = []
def get_hits(self, rule, starttime, endtime, index, scroll=False):
""" Query Elasticsearch for the given rule and return the results.
:param rule: The rule configuration.
:param starttime: The earliest time to query.
:param endtime: The latest time to query.
:return: A list of hits, bounded by rule['max_query_size'].
"""
query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], to_ts_func=rule['dt_to_ts'])
extra_args = {'_source_include': rule['include']}
scroll_keepalive = rule.get('scroll_keepalive', self.scroll_keepalive)
if not rule.get('_source_enabled'):
query['fields'] = rule['include']
extra_args = {}
try:
if scroll:
res = self.current_es.scroll(scroll_id=rule['scroll_id'], scroll=scroll_keepalive)
else:
res = self.current_es.search(scroll=scroll_keepalive, index=index, size=rule['max_query_size'], body=query, ignore_unavailable=True, **extra_args)
self.total_hits = int(res['hits']['total'])
logging.debug(str(res))
except ElasticsearchException as e:
# Elasticsearch sometimes gives us GIGANTIC error messages
# (so big that they will fill the entire terminal buffer)
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running query: %s' % (e), {'rule': rule['name']})
return None
hits = res['hits']['hits']
self.num_hits += len(hits)
lt = rule.get('use_local_time')
status_log = "Queried rule %s from %s to %s: %s / %s hits" % (rule['name'], pretty_ts(starttime, lt), pretty_ts(endtime, lt), self.num_hits, len(hits))
if self.total_hits > rule.get('max_query_size', self.max_query_size):
elastalert_logger.info("%s (scrolling..)" % status_log)
rule['scroll_id'] = res['_scroll_id']
else:
elastalert_logger.info(status_log)
hits = self.process_hits(rule, hits)
# Record doc_type for use in get_top_counts
if 'doc_type' not in rule and len(hits):
rule['doc_type'] = hits[0]['_type']
return hits
def post(json_request_body: dict,
replica: str,
per_page: int,
output_format: str,
_scroll_id: typing.Optional[str] = None) -> dict:
es_query = json_request_body['es_query']
per_page = PerPageBounds.check(per_page)
replica_enum = Replica[replica] if replica is not None else Replica.aws
get_logger().debug("Received posted query. Replica: %s Query: %s Per_page: %i Timeout: %s Scroll_id: %s",
replica_enum.name, json.dumps(es_query, indent=4), per_page, _scroll_id)
# TODO: (tsmith12) determine if a search operation timeout limit is needed
# TODO: (tsmith12) allow users to retrieve previous search results
# TODO: (tsmith12) if page returns 0 hits, then all results have been found. delete search id
try:
page = _es_search_page(es_query, replica_enum, per_page, _scroll_id, output_format)
request_dict = _format_request_body(page, es_query, replica_enum, output_format)
request_body = jsonify(request_dict)
if len(request_dict['results']) < per_page:
response = make_response(request_body, requests.codes.ok)
else:
response = make_response(request_body, requests.codes.partial)
next_url = _build_scroll_url(page['_scroll_id'], per_page, replica_enum, output_format)
response.headers['Link'] = _build_link_header({next_url: {"rel": "next"}})
return response
except TransportError as ex:
if ex.status_code == requests.codes.bad_request:
get_logger().debug("%s", f"Invalid Query Recieved. Exception: {ex}")
raise DSSException(requests.codes.bad_request,
"elasticsearch_bad_request",
f"Invalid Elasticsearch query was received: {str(ex)}")
elif ex.status_code == requests.codes.not_found:
get_logger().debug("%s", f"Search Context Error. Exception: {ex}")
raise DSSException(requests.codes.not_found,
"elasticsearch_context_not_found",
"Elasticsearch context has returned all results or timeout has expired.")
elif ex.status_code == 'N/A':
get_logger().error("%s", f"Elasticsearch Invalid Endpoint. Exception: {ex}")
raise DSSException(requests.codes.service_unavailable,
"service_unavailable",
"Elasticsearch reached an invalid endpoint. Try again later.")
else:
get_logger().error("%s", f"Elasticsearch Internal Server Error. Exception: {ex}")
raise DSSException(requests.codes.internal_server_error,
"internal_server_error",
"Elasticsearch Internal Server Error")
except ElasticsearchException as ex:
get_logger().error("%s", f"Elasticsearch Internal Server Error. Exception: {ex}")
raise DSSException(requests.codes.internal_server_error,
"internal_server_error",
"Elasticsearch Internal Server Error")