storage.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:tableschema-elasticsearch-py 作者: frictionlessdata 项目源码 文件源码
def write(self, bucket, doc_type, rows, primary_key, update=False, as_generator=False):

        if primary_key is None or len(primary_key) == 0:
            raise ValueError('primary_key cannot be an empty list')

        def actions(rows_, doc_type_, primary_key_, update_):
            if update_:
                for row_ in rows_:
                    yield {
                        '_op_type': 'update',
                        '_index': bucket,
                        '_type': doc_type_,
                        '_id': self.generate_doc_id(row_, primary_key_),
                        '_source': {
                            'doc': row_,
                            'doc_as_upsert': True
                        }
                    }
            else:
                for row_ in rows_:
                    yield {
                        '_op_type': 'index',
                        '_index': bucket,
                        '_type': doc_type_,
                        '_id': self.generate_doc_id(row_, primary_key_),
                        '_source': row_
                    }

        iterables = itertools.tee(rows)
        actions_iterable = actions(iterables[0], doc_type, primary_key, update)

        iter = zip(streaming_bulk(self.__es, actions=actions_iterable), iterables[1])

        if as_generator:
            for result, row in iter:
                yield row
        else:
            collections.deque(iter, maxlen=0)

        self.__es.indices.flush(bucket)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号