def insert_image(walk_func, serialize_func, chunk_size, max_results=5000, **kwargs):
count = 0
success_count = 0
es = search.init()
search.Image.init()
mapping = search.Image._doc_type.mapping
mapping.save(settings.ELASTICSEARCH_INDEX)
for chunk in grouper_it(chunk_size, walk_func(**kwargs)):
if max_results is not None and count >= max_results:
break
else:
images = []
for result in chunk:
image = serialize_func(result)
if image:
images.append(image)
if len(images) > 0:
try:
# Bulk update the search engine too
if not settings.DEBUG:
es.cluster.health(wait_for_status='green', request_timeout=2000)
search_objs = [search.db_image_to_index(img).to_dict(include_meta=True) for img in images]
elasticsearch.helpers.bulk(es, search_objs)
models.Image.objects.bulk_create(images)
log.debug("*** Committed set of %d images", len(images))
success_count += len(images)
except (requests.exceptions.ReadTimeout,
elasticsearch.exceptions.TransportError,
elasticsearch.helpers.BulkIndexError,
IntegrityError) as e:
log.warn("Got one or more integrity errors on batch: %s", e)
finally:
count += len(images)
return success_count
评论列表
文章目录