def get_hits(self, rule, starttime, endtime, index, scroll=False):
""" Query Elasticsearch for the given rule and return the results.
:param rule: The rule configuration.
:param starttime: The earliest time to query.
:param endtime: The latest time to query.
:return: A list of hits, bounded by rule['max_query_size'].
"""
query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], to_ts_func=rule['dt_to_ts'])
extra_args = {'_source_include': rule['include']}
scroll_keepalive = rule.get('scroll_keepalive', self.scroll_keepalive)
if not rule.get('_source_enabled'):
query['fields'] = rule['include']
extra_args = {}
try:
if scroll:
res = self.current_es.scroll(scroll_id=rule['scroll_id'], scroll=scroll_keepalive)
else:
res = self.current_es.search(scroll=scroll_keepalive, index=index, size=rule['max_query_size'], body=query, ignore_unavailable=True, **extra_args)
self.total_hits = int(res['hits']['total'])
logging.debug(str(res))
except ElasticsearchException as e:
# Elasticsearch sometimes gives us GIGANTIC error messages
# (so big that they will fill the entire terminal buffer)
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running query: %s' % (e), {'rule': rule['name']})
return None
hits = res['hits']['hits']
self.num_hits += len(hits)
lt = rule.get('use_local_time')
status_log = "Queried rule %s from %s to %s: %s / %s hits" % (rule['name'], pretty_ts(starttime, lt), pretty_ts(endtime, lt), self.num_hits, len(hits))
if self.total_hits > rule.get('max_query_size', self.max_query_size):
elastalert_logger.info("%s (scrolling..)" % status_log)
rule['scroll_id'] = res['_scroll_id']
else:
elastalert_logger.info(status_log)
hits = self.process_hits(rule, hits)
# Record doc_type for use in get_top_counts
if 'doc_type' not in rule and len(hits):
rule['doc_type'] = hits[0]['_type']
return hits
评论列表
文章目录