elastic.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:defplorex 作者: trendmicro 项目源码 文件源码
def bulk_index_from_it(
            self, index, it, transform=lambda x: x, last_updated=True):

        gc.collect()
        err_ids = []

        def _it():
            for doc_body in it:
                try:
                    log.debug('Working on record: %s', doc_body)
                    _id = doc_body.get(self.id_field)

                    try:
                        doc_body = transform(doc_body)
                    except Exception as e:
                        log.warn(
                                'Error while transforming doc ID = %s: %s',
                                _id, e)
                        raise e

                    if doc_body:
                        if last_updated:
                            doc_body['last_updated'] = datetime.now()

                        op = self.partial_index_op(
                                doc_id=_id,
                                index=index,
                                doc_body=doc_body,
                                doc_type=self.doc_type)
                        yield op
                except Exception as e:
                    log.warn('Cannot process doc ID = %s: %s', _id, e)
                    err_ids.append(_id)

        try:
            self.bulk(_it())
            log.info('Invoked self.bulk(_it())')
        except Exception as e:
            log.warn('Error in bulk index because: %s', e)

        return err_ids
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号