def write(self, bucket, doc_type, rows, primary_key, update=False, as_generator=False):
if primary_key is None or len(primary_key) == 0:
raise ValueError('primary_key cannot be an empty list')
def actions(rows_, doc_type_, primary_key_, update_):
if update_:
for row_ in rows_:
yield {
'_op_type': 'update',
'_index': bucket,
'_type': doc_type_,
'_id': self.generate_doc_id(row_, primary_key_),
'_source': {
'doc': row_,
'doc_as_upsert': True
}
}
else:
for row_ in rows_:
yield {
'_op_type': 'index',
'_index': bucket,
'_type': doc_type_,
'_id': self.generate_doc_id(row_, primary_key_),
'_source': row_
}
iterables = itertools.tee(rows)
actions_iterable = actions(iterables[0], doc_type, primary_key, update)
iter = zip(streaming_bulk(self.__es, actions=actions_iterable), iterables[1])
if as_generator:
for result, row in iter:
yield row
else:
collections.deque(iter, maxlen=0)
self.__es.indices.flush(bucket)
storage.py 文件源码
python
阅读 28
收藏 0
点赞 0
评论 0
评论列表
文章目录