def bulk_upsert(self, docs, namespace, timestamp):
"""Insert multiple documents into Elasticsearch."""
def docs_to_upsert():
doc = None
for doc in docs:
# Remove metadata and redundant _id
index, doc_type = self._index_and_mapping(namespace)
doc_id = u(doc.pop("_id"))
document_action = {
'_index': index,
'_type': doc_type,
'_id': doc_id,
'_source': self._formatter.format_document(doc)
}
document_meta = {
'_index': self.meta_index_name,
'_type': self.meta_type,
'_id': doc_id,
'_source': {
'ns': namespace,
'_ts': timestamp
}
}
yield document_action
yield document_meta
if doc is None:
raise errors.EmptyDocsError(
"Cannot upsert an empty sequence of "
"documents into Elastic Search")
try:
kw = {}
if self.chunk_size > 0:
kw['chunk_size'] = self.chunk_size
responses = streaming_bulk(client=self.elastic,
actions=docs_to_upsert(),
**kw)
for ok, resp in responses:
if not ok:
LOG.error(
"Could not bulk-upsert document "
"into ElasticSearch: %r" % resp)
if self.auto_commit_interval == 0:
self.commit()
except errors.EmptyDocsError:
# This can happen when mongo-connector starts up, there is no
# config file, but nothing to dump
pass
elastic_doc_manager.py 文件源码
python
阅读 22
收藏 0
点赞 0
评论 0
评论列表
文章目录