def _perform_index_sync(self, sql_table_cls, es_doc_cls, id_logger):
es_doc = es_doc_cls()
elasticsearch_conn = connections.get_connection()
sync_timestamp = current_server_timestamp()
pending_insertions = self._compute_dirty_documents(
sql_table_cls, es_doc.doc_type)
bulk_op = self._synchronisation_op(es_doc, pending_insertions)
self._logging(logging.INFO, 'Performing synchronization.')
for ok, info in parallel_bulk(elasticsearch_conn, bulk_op):
obj_id = info['index']['_id'] \
if 'index' in info else info['update']['_id']
if ok:
# Mark the task as handled so we don't retreat it next time
self._logging(logging.INFO,
'Document %s has been synced successfully.'
% obj_id)
sql_table_cls.update_last_sync(obj_id, sync_timestamp)
else:
id_logger(obj_id, logging.ERROR,
'Error while syncing document %s index.' % obj_id)
# Refresh indices to increase research speed
elasticsearch_dsl.Index(es_doc.index).refresh()
评论列表
文章目录