def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=None):
rule_filter = copy.copy(rule['filter'])
if qk:
filter_key = rule['query_key']
if rule.get('raw_count_keys', True) and not rule['query_key'].endswith('.raw'):
filter_key = add_raw_postfix(filter_key)
rule_filter.extend([{'term': {filter_key: qk}}])
base_query = self.get_query(rule_filter, starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'])
if size is None:
size = rule.get('terms_size', 50)
query = self.get_terms_query(base_query, size, key)
try:
res = self.current_es.search(index=index, doc_type=rule['doc_type'], body=query, search_type='count', ignore_unavailable=True)
except ElasticsearchException as e:
# Elasticsearch sometimes gives us GIGANTIC error messages
# (so big that they will fill the entire terminal buffer)
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running query: %s' % (e), {'rule': rule['name']})
return None
if 'aggregations' not in res:
return {}
buckets = res['aggregations']['filtered']['counts']['buckets']
self.num_hits += len(buckets)
lt = rule.get('use_local_time')
elastalert_logger.info('Queried rule %s from %s to %s: %s buckets' % (rule['name'], pretty_ts(starttime, lt), pretty_ts(endtime, lt), len(buckets)))
return {endtime: buckets}
评论列表
文章目录