def _synchronisation_op(self, elasticsearch_doctype, pending_insertions):
self._logging(logging.INFO,
'Computing required operations to synchronize documents.')
for p in pending_insertions:
doc_dict = p.to_dict(True)
try:
elasticsearch_doctype.get(p.id)
update_op = doc_dict
update_op['_op_type'] = 'update'
update_op['doc'] = doc_dict['_source']
del update_op['_source']
sync_op = update_op
except NotFoundError:
add_op = doc_dict
add_op['_op_type'] = 'index'
sync_op = add_op
yield sync_op
评论列表
文章目录