def title_suggest_view(request: HttpRequest) -> HttpResponse:
query = request.GET.get('q', '')
s = Search(using=es_client, index=es_index_name) \
.source(['title']) \
.query("match", title_suggest={'query': query, 'operator': 'and', 'fuzziness': 'AUTO'})
response = s.execute()
data = json.dumps(
[{'id': i.meta.id, 'value': i.title} for i in response]
)
mime_type = 'application/json; charset=utf-8'
return HttpResponse(data, mime_type)
python类Search()的实例源码
def get_es_search(self):
if self.es_model is None:
msg = "Cannot use %s on a view which does not have the 'es_model'"
raise ImproperlyConfigured(msg % self.__class__.__name__)
index = self.es_model()._get_index()
es_client = self.get_es_client()
s = Search(using=es_client, index=index, doc_type=self.es_model)
return s
def elasticsearch_retrieve_page_by_id(page_id):
query = Search().filter(Q("term", nid=int(page_id)))[:1]
result = query.execute()
if result.hits.total == 0:
return None
return result.hits[0]
def elasticsearch_delete_old():
_from = NEVER
_to = datetime.now() - timedelta(days=30)
query = Search().filter(Q("range", visited_at={'from': _from, 'to': _to}))
result = query.delete()
def elasticsearch_pages(context, sort, page):
result_limit = int(os.environ['RESULT_LIMIT'])
max_result_limit = int(os.environ['MAX_RESULT_LIMIT'])
start = (page - 1) * result_limit
end = start + result_limit
domain_query = Q("term", is_banned=False)
if context["is_up"]:
domain_query = domain_query & Q("term", is_up=True)
if not context["show_fh_default"]:
domain_query = domain_query & Q("term", is_crap=False)
if not context["show_subdomains"]:
domain_query = domain_query & Q("term", is_subdomain=False)
if context["rep"] == "genuine":
domain_query = domain_query & Q("term", is_genuine=True)
if context["rep"] == "fake":
domain_query = domain_query & Q("term", is_fake=True)
limit = max_result_limit if context["more"] else result_limit
has_parent_query = Q("has_parent", type="domain", query=domain_query)
if context['phrase']:
query = Search().filter(has_parent_query).query(Q("match_phrase", body_stripped=context['search']))
else:
query = Search().filter(has_parent_query).query(Q("match", body_stripped=context['search']))
query = query.highlight_options(order='score', encoder='html').highlight('body_stripped')[start:end]
query = query.source(['title','domain_id','created_at', 'visited_at']).params(request_cache=True)
if context["sort"] == "onion":
query = query.sort("_parent")
elif context["sort"] == "visited_at":
query = query.sort("-visited_at")
elif context["sort"] == "created_at":
query = query.sort("-created_at")
elif context["sort"] == "last_seen":
query = query.sort("-visited_at")
return query.execute()
def setUp(self):
super().setUp()
self.es = search.init_es()
connections.add_connection('default', self.es)
self.s = Search(index=settings.ELASTICSEARCH_INDEX)
search.Image.init()
self.es.cluster.health(wait_for_status='yellow', request_timeout=2000)
self.img1 = models.Image(title='greyhounds are fast',
creator="Rashid",
url='http://example.com/1',
license='CC0',
provider="flickr",
source="openimages",
tags_list=['greyhound', 'dog', 'object'])
self.img2 = models.Image(title='pumpkins are orange',
creator='???',
url='http://example.com/2',
license='CC-BY',
provider="rijksmuseum",
source="rijksmuseum",
tags_list=['gourds', 'fruit', 'object'])
self.img1.save()
self.img2.save()
self.url = reverse('index')
self.removed = models.Image.objects.create(title='removed', url=FOREIGN_URL + TEST_IMAGE_REMOVED, license="cc0")
def test_date_range(app, es, event_queues, indexed_events):
aggregate_events(['file-download-agg'])
current_search_client.indices.refresh(index='*')
query = Search(using=current_search_client,
index='stats-file-download')[0:30].sort('file_id')
results = query.execute()
total_count = 0
for result in results:
if 'file_id' in result:
total_count += result.count
assert total_count == 30
def get_bookmark(self):
"""Get last aggregation date."""
if not Index(self.aggregation_alias,
using=self.client).exists():
if not Index(self.event_index,
using=self.client).exists():
return datetime.date.today()
return self._get_oldest_event_timestamp()
# retrieve the oldest bookmark
query_bookmark = Search(
using=self.client,
index=self.aggregation_alias,
doc_type='{0}-bookmark'.format(self.event)
)[0:1].sort(
{'date': {'order': 'desc'}}
)
bookmarks = query_bookmark.execute()
# if no bookmark is found but the index exist, the bookmark was somehow
# lost or never written, so restart from the beginning
if len(bookmarks) == 0:
return self._get_oldest_event_timestamp()
# change it to doc_id_suffix
bookmark = datetime.datetime.strptime(bookmarks[0].date,
self.doc_id_suffix)
return bookmark
def build_query(self, start_date, end_date, **kwargs):
"""Build the elasticsearch query."""
agg_query = Search(using=self.client,
index=self.index,
doc_type=self.doc_type)[0:0]
if start_date is not None or end_date is not None:
time_range = {}
if start_date is not None:
time_range['gte'] = start_date.isoformat()
if end_date is not None:
time_range['lte'] = end_date.isoformat()
agg_query = agg_query.filter(
'range',
**{self.time_field: time_range})
term_agg = agg_query.aggs
for term in self.aggregated_fields:
term_agg = term_agg.bucket(term, 'terms', field=term, size=0)
term_agg.metric('total', 'sum', field='count')
if self.copy_fields:
term_agg.metric(
'top_hit', 'top_hits', size=1, sort={'timestamp': 'desc'}
)
for query_param, filtered_field in self.required_filters.items():
if query_param in kwargs:
agg_query = agg_query.filter(
'term', **{filtered_field: kwargs[query_param]}
)
return agg_query
def main():
parser = argparse.ArgumentParser(description='Download items from ES index')
arg = parser.add_argument
arg('output', help='output in .jl.gz format')
arg('index', help='ES index name')
arg('--domain', help='url.domain to filter')
arg('--id', help='record id')
arg('--host', default='localhost', help='ES host in host[:port] format')
arg('--user', help='HTTP Basic Auth user')
arg('--password', help='HTTP Basic Auth password')
arg('--chunk-size', type=int, default=100, help='download chunk size')
args = parser.parse_args()
kwargs = {}
if args.user or args.password:
kwargs['http_auth'] = (args.user, args.password)
client = elasticsearch.Elasticsearch(
[args.host],
connection_class=elasticsearch.RequestsHttpConnection,
timeout=600,
**kwargs)
print(client.info())
search = Search(using=client, index=args.index)
if args.domain:
search = search.filter('term', **{'url.domain': args.domain})
if args.id:
search = search.filter('term', **{'_id': args.id})
total = 0
with tqdm.tqdm(total=search.count()) as pbar:
with gzip.open(args.output, 'wt') as f:
for x in search.params(size=args.chunk_size).scan():
total += 1
pbar.update(1)
f.write(json.dumps(x.to_dict()))
f.write('\n')
print('{:,} items downloaded to {}'.format(total, args.output))
def determine_metadata(self, request, view):
result = super().determine_metadata(request, view)
result['parameters'] = {
'q': {
'type': 'string',
'description': 'The query to search for',
'required': True,
},
}
return result
# =============================================
# Search view sets
# =============================================
def search_query(self, client, analyzer: InputQAnalyzer) -> Search:
"""
Construct the search query that is executed by this view set.
"""
raise NotImplementedError
def search_query(self, client, analyzer: InputQAnalyzer) -> Search:
"""
Execute search on Subject
"""
search = vestiging_query(analyzer)\
.to_elasticsearch_object(client)
return search.filter('terms', _type=['vestiging'])
def search_query(self, client, analyzer: InputQAnalyzer) -> Search:
"""
Execute search on Subject
"""
search = mac_query(analyzer).to_elasticsearch_object(client)
return search
def __init__(self, index, doc_type, connection=None):
self._connection = connection or ElasticsearchBackend.DEFAULT_CONNECTION
self._index = index
self._doc_type = doc_type
self._connection.indices.create(self._index, ignore=400)
self._connection.indices.put_mapping(body={doc_type: self.ES_MAPPING}, index=index, doc_type=doc_type)
self.search = elasticsearch_dsl.Search(self._connection, index=index, doc_type=doc_type)
def find_gym(self, gym):
s = Search(using=self.client, index="marker").query("match", title={'query': gym, 'fuzziness': 2, 'slop': 1})
response = s.execute()
if response.hits.total == 0:
await self.bot.say("I couldn't find that gym")
return None, None
hit = response[0]
monacle_gym = await self.get_monacle_gym(hit)
return hit, monacle_gym
def exists_repos_in_database(self, github_id):
if 0 != elasticsearch_dsl \
.Search(using=self._es, index=self._es_index, doc_type=self._language) \
.query('term', repo__github_id=github_id) \
.count():
return True
return False
def num_repos(self):
if self._es.indices.exists(index=self._es_index):
s = elasticsearch_dsl.Search(using=self._es, index=self._es_index, doc_type=self._language)
s.aggs.bucket('num_repos', A('cardinality', field='repo.github_id'))
response = s.execute()
return response.aggregations.num_repos.value
return 0
def get_features(self):
if self._es.indices.exists(index=self._es_index):
s = elasticsearch_dsl.Search(using=self._es, index=self._es_index, doc_type=self._language)
response = s.execute()
if 0 != len(response.hits):
return response.hits
return False
def update_dataset_stats(dataset):
search = Search(index=get_tweets_index_name(dataset.meta.id))
search = search.query('term', dataset_id=dataset.meta.id)[0:0]
search.aggs.metric('created_at_min', 'min', field='created_at')
search.aggs.metric('created_at_max', 'max', field='created_at')
search_response = search.execute()
dataset.first_tweet_created_at = datetime.utcfromtimestamp(
search_response.aggregations.created_at_min.value / 1000.0)
dataset.last_tweet_created_at = datetime.utcfromtimestamp(
search_response.aggregations.created_at_max.value / 1000.0)
dataset.tweet_count = search_response.hits.total
dataset.save()