def execute_search(search_obj):
"""
Executes a search against ES after checking the connection
Args:
search_obj (Search): elasticsearch_dsl Search object
Returns:
elasticsearch_dsl.result.Response: ES response
"""
# make sure there is a live connection
if search_obj._index is None: # pylint: disable=protected-access
# If you're seeing this it means you're creating Search() without using
# create_search_obj which sets important fields like the index and doc_type.
raise ImproperlyConfigured("search object is missing an index")
get_conn()
return search_obj.execute()
python类Search()的实例源码
def first_words(index='codetoname', language='python'):
es = elasticsearch.Elasticsearch()
# update first name
s = elasticsearch_dsl.Search(using=es, index=index, doc_type=language)\
.query('bool', filter=Q('exists', field='feature') & Q('missing', field='first_name'))
for hit in s.scan():
data = hit.to_dict()
feature = json.loads(data['feature'])
data['first_name'] = firstname(feature['name'], language)
es.index(index=index, doc_type=language, id=hit.meta.id, body=data)
es.indices.refresh(index=index)
# aggregation
s = elasticsearch_dsl.Search(using=es, index=index, doc_type=language)\
.query('bool', filter=Q('exists', field='feature'))
a = A('terms', field='first_name')
s.aggs.bucket('first_name_terms', a)
response = s.execute()
words = []
for item in response.aggregations.first_name_terms.buckets:
percentage = item.doc_count / float(response.hits.total) * 100
words.append({'word': item.key, 'percentage': percentage})
return words
def get_summary_statistics():
"""
Obtains statistics about current sum of flows, packets, bytes.
:return: JSON with status "ok" or "error" and requested data.
"""
try:
# Elastic query
client = elasticsearch.Elasticsearch([{'host': myconf.get('consumer.hostname'), 'port': myconf.get('consumer.port')}])
elastic_bool = []
elastic_bool.append({'range': {'@timestamp': {'gte': "now-5m", 'lte': "now"}}})
elastic_bool.append({'term': {'@type': 'protocols_statistics'}})
qx = Q({'bool': {'must': elastic_bool}})
s = Search(using=client, index='_all').query(qx)
s.aggs.bucket('sum_of_flows', 'sum', field='flows')
s.aggs.bucket('sum_of_packets', 'sum', field='packets')
s.aggs.bucket('sum_of_bytes', 'sum', field='bytes')
s.sort('@timestamp')
result = s.execute()
# Result Parsing into CSV in format: timestamp, tcp protocol value, udp protocol value
data = "Timestamp, Flows, Packets, Bytes;"
timestamp = "Last 5 Minutes"
data += timestamp + ', ' +\
str(int(result.aggregations.sum_of_flows['value'])) + ', ' +\
str(int(result.aggregations.sum_of_packets['value'])) + ', ' +\
str(int(result.aggregations.sum_of_bytes['value']))
json_response = '{"status": "Ok", "data": "' + data + '"}'
return json_response
except Exception as e:
json_response = '{"status": "Error", "data": "Elasticsearch query exception: ' + escape(str(e)) + '"}'
return json_response
def filter_by(self, query, conditions, order_criteria=None):
"""
:param query: Search object
:type query: elasticsearch.Search
:param conditions: conditions dictionary
:type conditions: dict
:param order_criteria: optional order criteria
:type order_criteria: list
:return: modified query
:rtype: elasticsearch.Search
"""
expressions = self._build_filter_expressions(conditions, None)
if expressions is None:
return query
if order_criteria and '_score' not in order_criteria and '-_score' not in order_criteria:
return query.update_from_dict({'query': {'constant_score': {'filter': expressions}}})
return query.update_from_dict({'query': expressions})
def setup_es(es_ip, es_port):
"""
Setup an Elasticsearch connection
Parameters
----------
es_ip: string
IP address for elasticsearch instance
es_port: string
Port for elasticsearch instance
Returns
-------
es_conn: an elasticsearch_dsl Search connection object.
"""
CLIENT = Elasticsearch([{'host' : es_ip, 'port' : es_port}])
S = Search(using=CLIENT, index="geonames")
return S
def prepare_and_execute_search(user, search_param_dict=None, search_func=execute_search,
filter_on_email_optin=False):
"""
Prepares a Search object and executes the search against ES
Args:
user (User): User object
search_param_dict (dict): A dict representing the body of an ES query
search_func (callable): The function that executes the search
filter_on_email_optin (bool): If true, filter out profiles where email_optin != True
Returns:
elasticsearch_dsl.result.Response: ES response
"""
search_obj = create_search_obj(
user,
search_param_dict=search_param_dict,
filter_on_email_optin=filter_on_email_optin,
)
return search_func(search_obj)
def test_work_types_dont_override_provider(self):
"""[#122] Selecting work types should be a subset of providers, not override them"""
img1 = models.Image.objects.create(url='example.com/1', title='hello', provider='flickr')
img2 = models.Image.objects.create(url='example.com/2', title='hello', provider='nypl')
self._index_img(img1)
self._index_img(img2)
# Search by provider=flickr but work type=cultural should limit by Flickr first
resp = self.client.get(self.url, {'search_fields': 'title',
'search': 'hello',
'providers': 'flickr',
'work_types': 'cultural'})
# One result, the correct one
self.assertEqual(1, len(select_nodes(resp, '.t-image-result')))
# We now have also img[data-identifier], which is used by photoswipe
self.assertEqual(1, len(select_nodes(resp, 'div[data-identifier="' + img1.identifier +'"]')))
self.assertEqual(0, len(select_nodes(resp, 'div[data-identifier="' + img2.identifier +'"]')))
def about(request):
"""Information about the current site, its goals, and what content is loaded"""
# Provider counts
providers = cache.get_or_set(CACHE_STATS_NAME, [], CACHE_STATS_DURATION)
if not providers:
for provider in sorted(settings.PROVIDERS.keys()):
s = Search()
q = Q('term', provider=provider)
s = s.query(q)
response = s.execute()
if response.hits.total > 0:
data = settings.PROVIDERS[provider]
total = intcomma(response.hits.total)
data.update({'hits': total})
providers.append(data)
# All results
s = Search()
response = s.execute()
total = intcomma(response.hits.total)
providers.append({'display_name': 'Total', 'hits': total})
cache.set(CACHE_STATS_NAME, providers)
return render(request, "about.html", {'providers': providers})
def correct_orphan_records(self, provider='europeana', end=None):
"""[#185] Delete records from the search engine which aren't found in the database"""
s = Search()
q = Q('term', provider=provider)
s = s.query(q)
response = s.execute()
total = response.hits.total
# A file extracted from the production database listing all of the europeana identifiers
identifier_file = '/tmp/europeana-identifiers.json'
db_identifiers = set(json.load(open(identifier_file)))
total_in_db = len(db_identifiers)
log.info("Using search engine instance %s", settings.ELASTICSEARCH_URL)
log.info("Total records: %d (search engine), %d (database) [diff=%d]", total, total_in_db, total - total_in_db)
deleted_count = 0
for r in s.scan():
if r.identifier not in db_identifiers:
img = search.Image.get(id=r.identifier)
log.debug("Going to delete image %s", img)
deleted_count += 1
log.info("Deleted %d from search engine", deleted_count)
def test_filter_robots(app, es, event_queues, indexed_events, with_robots):
"""Test the filter_robots query modifier."""
query_modifiers = []
if not with_robots:
query_modifiers = [filter_robots]
StatAggregator(client=current_search_client,
event='file-download',
aggregation_field='file_id',
aggregation_interval='day',
query_modifiers=query_modifiers).run()
current_search_client.indices.refresh(index='*')
query = Search(
using=current_search_client,
index='stats-file-download',
doc_type='file-download-day-aggregation'
)[0:30].sort('file_id')
results = query.execute()
assert len(results) == 3
for result in results:
if 'file_id' in result:
assert result.count == (5 if with_robots else 2)
def _get_oldest_event_timestamp(self):
"""Search for the oldest event timestamp."""
# Retrieve the oldest event in order to start aggregation
# from there
query_events = Search(
using=self.client,
index=self.event_index
)[0:1].sort(
{'timestamp': {'order': 'asc'}}
)
result = query_events.execute()
# There might not be any events yet if the first event have been
# indexed but the indices have not been refreshed yet.
if len(result) == 0:
return None
return parser.parse(result[0]['timestamp'])
def to_elasticsearch_object(self, client) -> Search:
assert self.indexes
search = (
Search()
.using(client)
.index(*self.indexes)
.query(self.query)
)
if self.sort_fields:
search = search.sort(*self.sort_fields)
size = 15 # default size
if self.size:
size = self.size
search = search[0:size]
return search
def pokedex(self, pokemon):
s = Search(using=self.client, index="pokemon").query("match", name={'query': pokemon, 'fuzziness': 2})
response = s.execute()
if response.hits.total == 0:
await self.bot.say("I couldn't find that pokemon")
return
hit = response[0]
embed=discord.Embed(title=hit.name, url="http://bulbapedia.bulbagarden.net/wiki/{}".format(hit.name), timestamp=(datetime.datetime(2017,7,6)))
embed.set_thumbnail(url="http://serebii.net/pokemongo/pokemon/{:03d}.png".format(int(hit.meta.id)))
embed.add_field(name='Base Attack Stat', value=hit.attack_ratio)
embed.add_field(name='Base Defence Stat', value=hit.defense_ratio)
embed.add_field(name='Base HP Stat', value=hit.hp_ratio)
embed.add_field(name='Min CP', value=hit.min_cp_cap)
embed.add_field(name='Max CP', value=hit.max_cp_cap)
embed.add_field(name='Best Offensive Moveset', value=hit.basic_attack+' / '+hit.charge_attack)
#embed.add_field(name='Basic Atk', value=hit.basic_attack)
#embed.add_field(name='Quick DPS', value=hit.quick_dps)
#embed.add_field(name='Charge Atk', value=hit.charge_attack)
#embed.add_field(name='Charge DPS', value=hit.charge_dps)
#embed.add_field(name='Offensive %', value=hit.offensive_percent)
#embed.add_field(name='Duel %', value=hit.duel_percent)
#embed.add_field(name='Defensive %', value=hit.defensive_percent)
#embed.add_field(name='Full Cycle DPS', value=hit.full_cycle_dps)
embed.set_footer(text='Min and Max CP are for level 40. Best Offensive Moveset may be incorrect.')
await self.bot.say(embed=embed)
def find(replica: str):
owner = request.token_info['email']
es_client = ElasticsearchClient.get(logger)
search_obj = Search(using=es_client,
index=Config.get_es_index_name(ESIndexType.subscriptions, Replica[replica]),
doc_type=ESDocType.subscription.name)
search = search_obj.query({'match': {'owner': owner}})
responses = [{
'uuid': hit.meta.id,
'replica': replica,
'owner': owner,
'callback_url': hit.callback_url,
'es_query': hit.es_query.to_dict()}
for hit in search.scan()]
full_response = {'subscriptions': responses}
return jsonify(full_response), requests.codes.okay
def test_missing_field_update(self):
"""Al actualizar una distribución, si falta un field
previamente indexado, no se borran los datos anteriores
"""
missing_field = '212.1_PSCIOS_ERS_0_0_22'
self._index_catalog('full_ts_data.json')
# Segunda corrida, 'actualización' del catálogo
self._index_catalog('missing_field.json')
results = Search(using=self.elastic,
index=self.test_index) \
.filter('match', series_id=missing_field).execute()
self.assertTrue(len(results))
self.assertTrue(Field.objects.filter(series_id=missing_field))
def _get_notifications_by_email(self, email):
search = elasticsearch_dsl.Search(using=self.client, index=self.config['ELASTICSEARCH_INDEX'],
doc_type=self.config['ELASTICSEARCH_DOCTYPE'])
query = self.create_query_for_email(search, email)
# logger.debug("query:\n{!s}", pprint.pformat(query.to_dict()))
try:
response = query.execute()
except elasticsearch.exceptions.ElasticsearchException:
logger.exception("Exception caught in Elasticsearch query:\n index: {!r}\n doc_type: {!r}\n"
" query: {!s}".format(self.config['ELASTICSEARCH_INDEX'],
self.config['ELASTICSEARCH_DOCTYPE'], pprint.pformat(query.to_dict())))
# logger.debug("response:\n{!s}", pprint.pformat(response.to_dict()))
return response.hits.hits
def tag_by_email(self, emails, breached):
docs = []
s = Search(using=self.es).\
filter(Q({'terms': {'contact_email.keyword': emails}})).\
source(['id_submission'])
print('%s emails breached=%s' % (len(emails), breached))
for hit in s.scan():
docs.append(lib.bulk_update_doc(hit['id_submission'], {'breached': breached}))
if not len(docs) % 500:
print('\tfetched %s' % len(docs))
print('\t%s matches' % len(docs))
return docs
def run(self):
emails = {
'breached': set(),
'unbreached': set(),
}
# contact_email exists
must = [Q('exists', field='contact_email')]
# matches source if specified
if self.source:
must.append(Q({'term': {'analysis.source': self.source}}))
# not already tagged with breached
s = Search(using=self.es).\
query(FunctionScore(
query=Q('bool',
must=must,
must_not=[Q('exists', field='analysis.breached')]),
functions=[SF('random_score', seed=int(time.time()))]
)).\
source(['contact_email'])
print('%s breached: source=%s limit=%s' % (datetime.now().isoformat(), self.source,
self.limit))
print('query=\n%s' % json.dumps(s.to_dict()))
for filing in s[:self.limit]:
email = filing['contact_email']
if not email or email in emails['breached'] or email in emails['unbreached']:
continue
breached = self.is_breached(email)
emails['breached' if breached else 'unbreached'].add(email)
docs = []
print('done source=%s' % self.source)
if emails['breached']:
docs += self.tag_by_email(list(emails['breached']), True)
if emails['unbreached']:
docs += self.tag_by_email(list(emails['unbreached']), False)
try:
lib.bulk_update(self.es, docs)
except Exception as e:
print('error indexing: %s' % e)
def monitor(index, delta, query_string):
click.clear()
def cnt():
q = Q('query_string', query=query_string)
s = Search(
using=es.client,
index=index).query(q)
return s.count()
N = cnt()
tot = Search(using=es.client, index=index).count()
if not delta:
N = tot
log.info('Processing %d records (total: %d)', N, tot)
click.echo('You can exit by CTRL-C: results will still process')
bar = SlowOverallFancyBar('', max=N, grand_total=tot)
while True:
time.sleep(5.0)
try:
n = cnt()
if isinstance(n, int):
if delta:
done = N - n
else:
done = n
bar.goto(done)
except Exception as e:
log.warn('Cannot count: %s', e)
bar.finish()
def clone_index(use_helper, from_index, to_index):
"""Clone an index"""
from elasticsearch_dsl import Search
from elasticsearch.helpers import reindex
click.clear()
if not es.client.indices.exists(index=to_index):
click.secho('%s not existing!'.format(to_index), fg='red')
return 1
cnt = Search(using=es.client, index=to_index).count()
message = 'Index %s already exists (%d records). Overwrite?' % (
to_index, cnt)
click.confirm(message, abort=True)
if use_helper:
reindex(
client=es.client,
source_index=from_index,
target_index=to_index)
else:
es.client.reindex(
body=dict(
source=dict(index=from_index),
dest=dict(index=to_index)),
wait_for_completion=False)
def monitor_clone_index(from_index, to_index):
"""Monitor the size of an index"""
from elasticsearch_dsl import Search
click.clear()
cnt = Search(using=es.client, index=from_index).count()
bar = SlowFancyBar('', max=cnt)
while True:
time.sleep(2.0)
_cnt = Search(using=es.client, index=to_index).count()
bar.goto(_cnt)
bar.finish()
def search(self, **kwargs):
q = kwargs.get('q', '*')
sort = kwargs.get('sort', 'timestamp')
search_after = kwargs.get('search_after')
size = kwargs.get('size', 50)
source = kwargs.get('source')
extra = dict(
size=size)
if search_after:
extra.update(dict(search_after=search_after))
s = Search(using=self.client, index=self.index_name)
if source:
s = s.source(source)
s = s.sort(sort)
s = s.query(Q('query_string', query=q))
s = s.extra(**extra)
log.info('Query: %s', s.to_dict())
r = s.execute()
count = r.hits.total
took = r.took
result = r, count, took
return result
def count(self, index, query):
try:
s = Search(
using=self.client,
index=index,
doc_type=self.doc_type). \
update_from_dict(query)
log.info('Querying: %s', s.to_dict())
return s.count()
except Exception as e:
log.warn('Cannot count: %s', e)
def __init__(self, *args, **kwargs):
assert self.document is not None
self.client = connections.get_connection()
self.index = self.document._doc_type.index
self.mapping = self.document._doc_type.mapping.properties.name
self.search = Search(using=self.client, index=self.index)
super(BaseDocumentViewSet, self).__init__(*args, **kwargs)
def get_base_query(self, req, resp):
return Search(using=self.connection,
index=self.objects_class._doc_type.index,
doc_type=self.objects_class)
def test_filter_by(connection, query_filtered):
"""
Test `get_object` func
"""
conditions, expected = query_filtered
if isinstance(conditions, str):
conditions = json.loads(conditions, object_pairs_hook=OrderedDict)
if isinstance(expected, str):
expected = json.loads(expected)
c = CollectionResource(objects_class=Model, connection=connection)
query_obj = c.filter_by(Search(using=connection).doc_type(Model), conditions)
assert query_obj.to_dict()['query'] == expected
def test_order_by(connection, query_ordered):
"""
Test `get_object` func
"""
conditions, expected = query_ordered
if isinstance(conditions, str):
conditions = json.loads(conditions, object_pairs_hook=OrderedDict)
if isinstance(expected, str):
expected = json.loads(expected)
query_obj = Search(using=connection, doc_type=Model).sort(*conditions)
assert query_obj.to_dict() == expected
def test_totals(connection, query_totals):
"""
Test `get_object` func
"""
totals, expected = query_totals
if isinstance(totals, str):
totals = json.loads(totals, object_pairs_hook=OrderedDict)
if isinstance(expected, str):
expected = json.loads(expected)
c = CollectionResource(objects_class=Model, connection=connection)
query_obj = c._build_total_expressions(Search(using=connection).doc_type(Model), totals)
assert query_obj.to_dict() == expected
def search_for_field(search_obj, field_name, page_size=DEFAULT_ES_LOOP_PAGE_SIZE):
"""
Retrieves all unique instances of a field for documents that match an ES query
Args:
search_obj (Search): Search object
field_name (str): The name of the field for the value to get
page_size (int): Number of docs per page of results
Returns:
set: Set of unique values
"""
results = set()
# Maintaining a consistent sort on '_doc' will help prevent bugs where the
# index is altered during the loop.
# This also limits the query to only return the field value.
search_obj = search_obj.sort('_doc').fields(field_name)
loop = 0
all_results_returned = False
while not all_results_returned:
from_index = loop * page_size
to_index = from_index + page_size
search_results = execute_search(search_obj[from_index: to_index])
# add the field value for every search result hit to the set
for hit in search_results.hits:
results.add(getattr(hit, field_name)[0])
all_results_returned = to_index >= search_results.hits.total
loop += 1
return results
def get_all_query_matching_emails(search_obj, page_size=DEFAULT_ES_LOOP_PAGE_SIZE):
"""
Retrieves all unique emails for documents that match an ES query
Args:
search_obj (Search): Search object
page_size (int): Number of docs per page of results
Returns:
set: Set of unique emails
"""
return search_for_field(search_obj, "email", page_size=page_size)