def run_search(query, fields, sort=None, highlight=None):
query_body = {
'query': query,
'fields': fields,
}
if sort:
query_body['sort'] = sort
if highlight:
query_body['highlight'] = highlight
return es.search(
index=settings.ELASTICSEARCH_INDEX,
doc_type=DOCUMENT_TYPE,
body=query_body,
)
python类ELASTICSEARCH_INDEX的实例源码
def validate_status(value):
"""Test if an onion domain is not banned."""
res = get_elasticsearch_object().count(
index=settings.ELASTICSEARCH_INDEX,
doc_type=settings.ELASTICSEARCH_INDEX,
body={
"query": {
"constant_score" : {
"filter" : {
"bool": {
"must": [
{"term": {"domain": value}},
{"term": {"banned": 1}}
]
}
}
}
}
}
)
if res['count'] > 0:
raise ValidationError(
_("This onion is banned and cannot be added to this index.")
)
def run_search(query, fields, sort=None, highlight=None):
query_body = {
'query': query,
'fields': fields,
}
if sort:
query_body['sort'] = sort
if highlight:
query_body['highlight'] = highlight
return es.search(
index=settings.ELASTICSEARCH_INDEX,
doc_type=DOCUMENT_TYPE,
body=query_body,
)
def index_data(document):
"""Indexes an instance of the Document model in Elasticsearch."""
sha1 = document.file.sha1
es.index(
index=settings.ELASTICSEARCH_INDEX,
doc_type=DOCUMENT_TYPE,
id=sha1,
body=document.parsed,
)
def handle(self, **options):
es.indices.delete(settings.ELASTICSEARCH_INDEX, ignore=[400, 404])
es.indices.create(settings.ELASTICSEARCH_INDEX, {
"mappings": MAPPINGS,
"settings": SETTINGS,
})
def get_temp_alias():
"""
Get name for alias to a the temporary index
"""
return "{}_temp".format(settings.ELASTICSEARCH_INDEX)
def get_default_alias():
"""
Get name for the alias to the default index
"""
return "{}_alias".format(settings.ELASTICSEARCH_INDEX)
def setUp(self):
"""
Start without any index
"""
super().setUp()
conn = get_conn(verify=False)
for index in conn.indices.get_aliases().keys():
if index.startswith(settings.ELASTICSEARCH_INDEX):
conn.indices.delete(index)
# Clear globals
from search import indexing_api
indexing_api._CONN = None # pylint: disable=protected-access
indexing_api._CONN_VERIFIED = False # pylint: disable=protected-access
def test_no_mapping(self):
"""
Test that error is raised if we don't have a mapping
"""
conn = get_conn(verify=False)
backing_index = "{}_backing".format(settings.ELASTICSEARCH_INDEX)
conn.indices.create(backing_index)
conn.indices.put_alias(name=get_default_alias(), index=backing_index)
with self.assertRaises(ReindexException) as ex:
get_conn()
assert str(ex.exception) == "Mapping {} not found".format(USER_DOC_TYPE)
def setUp(self):
"""
Start without any index
"""
super().setUp()
conn = get_conn(verify=False)
for index in conn.indices.get_aliases().keys():
if index.startswith(settings.ELASTICSEARCH_INDEX):
conn.indices.delete(index)
def setUp(self):
super().setUp()
self.es = search.init_es()
connections.add_connection('default', self.es)
self.s = Search(index=settings.ELASTICSEARCH_INDEX)
search.Image.init()
self.es.cluster.health(wait_for_status='yellow', request_timeout=2000)
self.img1 = models.Image(title='greyhounds are fast',
creator="Rashid",
url='http://example.com/1',
license='CC0',
provider="flickr",
source="openimages",
tags_list=['greyhound', 'dog', 'object'])
self.img2 = models.Image(title='pumpkins are orange',
creator='???',
url='http://example.com/2',
license='CC-BY',
provider="rijksmuseum",
source="rijksmuseum",
tags_list=['gourds', 'fruit', 'object'])
self.img1.save()
self.img2.save()
self.url = reverse('index')
self.removed = models.Image.objects.create(title='removed', url=FOREIGN_URL + TEST_IMAGE_REMOVED, license="cc0")
def tearDown(self):
index = Index(settings.ELASTICSEARCH_INDEX)
index.delete(ignore=404)
def _index_img(self, img):
"""Index a single img and ensure that it's been propagated to the search engine"""
image = search.db_image_to_index(img)
image.save()
index = Index(name=settings.ELASTICSEARCH_INDEX)
index.flush(force=True)
index.refresh()
def index_all_images(self, chunk_size=DEFAULT_CHUNK_SIZE, num_iterations=DEFAULT_NUM_ITERATIONS,
num_threads=DEFAULT_NUM_THREADS):
"""Index every record in the database with a server-side cursor"""
index = Index(settings.ELASTICSEARCH_INDEX)
if not index.exists():
log.info("Creating new index %s", settings.ELASTICSEARCH_INDEX)
search.Image.init()
mapping = search.Image._doc_type.mapping
mapping.save(settings.ELASTICSEARCH_INDEX)
log.info("Done creating new index")
with Pool(num_threads) as pool:
starts = [i * chunk_size for i in range(0, num_iterations)]
pool.starmap(do_index, zip(starts, itertools.repeat(chunk_size, len(starts))))
def insert_image(walk_func, serialize_func, chunk_size, max_results=5000, **kwargs):
count = 0
success_count = 0
es = search.init()
search.Image.init()
mapping = search.Image._doc_type.mapping
mapping.save(settings.ELASTICSEARCH_INDEX)
for chunk in grouper_it(chunk_size, walk_func(**kwargs)):
if max_results is not None and count >= max_results:
break
else:
images = []
for result in chunk:
image = serialize_func(result)
if image:
images.append(image)
if len(images) > 0:
try:
# Bulk update the search engine too
if not settings.DEBUG:
es.cluster.health(wait_for_status='green', request_timeout=2000)
search_objs = [search.db_image_to_index(img).to_dict(include_meta=True) for img in images]
elasticsearch.helpers.bulk(es, search_objs)
models.Image.objects.bulk_create(images)
log.debug("*** Committed set of %d images", len(images))
success_count += len(images)
except (requests.exceptions.ReadTimeout,
elasticsearch.exceptions.TransportError,
elasticsearch.helpers.BulkIndexError,
IntegrityError) as e:
log.warn("Got one or more integrity errors on batch: %s", e)
finally:
count += len(images)
return success_count
def index_data(document):
"""Indexes an instance of the Document model in Elasticsearch."""
sha1 = document.file.sha1
es.index(
index=settings.ELASTICSEARCH_INDEX,
doc_type=DOCUMENT_TYPE,
id=sha1,
body=document.parsed,
)
def handle(self, **options):
es.indices.delete(settings.ELASTICSEARCH_INDEX, ignore=[400, 404])
es.indices.create(settings.ELASTICSEARCH_INDEX, {
"mappings": MAPPINGS,
"settings": SETTINGS,
})
def handle(self, *args, **options):
if options['verbose']:
log.setLevel(logging.DEBUG)
es = search.init_es(timeout=2000)
oldindex = Index(options['oldindex'])
client = elasticsearch.client.IndicesClient(es)
# Create the new index
newindex = Index(options['newindex'])
newindex.doc_type(search.Image)
try:
newindex.create()
except elasticsearch.exceptions.RequestError as e:
if options['force']:
log.warn("Trying to delete previously-created new index %s", options['newindex'])
newindex.delete()
newindex.create()
else:
raise e
log.info("Done creating new index %s", options['newindex'])
log.info("Copying data on %s to %s", options['oldindex'], options['newindex'])
# Would love to use ES native reindex() but AWS's service doesn't support it :(
elasticsearch.helpers.reindex(es, options['oldindex'], options['newindex'])
# Wait for it to be happy
if not settings.DEBUG:
es.cluster.health(wait_for_status='green', request_timeout=2000)
# Is the value of 'oldindex' an alias or a real index?
if client.exists_alias(name=settings.ELASTICSEARCH_INDEX):
log.info("Confirmed that value of %s is an alias and not a real index" % options['oldindex'])
alias_move = """{
"actions" : [
{ "remove" : { "index" : "%s", "alias" : "%s" } },
{ "add" : { "index" : "%s", "alias" : "%s" } }
]
}""" % (options['oldindex'], settings.ELASTICSEARCH_INDEX, options['newindex'], settings.ELASTICSEARCH_INDEX)
client.update_aliases(alias_move)
elif client.exists(options['oldindex']):
log.info("%s is a real index and not an alias, fixing" % options['oldindex'])
# Delete the old index
log.info("Deleting %s -- this will cause some downtime", options['oldindex'])
oldindex.delete()
client.put_alias(options['newindex'], settings.ELASTICSEARCH_INDEX)
# Confirm number of documents in current settings
s = Search()
response = s.execute()
log.info("%d results available in %s" % (response.hits.total, settings.ELASTICSEARCH_INDEX))