impl_elasticsearch.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:panko 作者: openstack 项目源码 文件源码
def record_events(self, events):

        def _build_bulk_index(event_list):
            for ev in event_list:
                traits = {t.name: t.value for t in ev.traits}
                yield {'_op_type': 'create',
                       '_index': '%s_%s' % (self.index_name,
                                            ev.generated.date().isoformat()),
                       '_type': ev.event_type,
                       '_id': ev.message_id,
                       '_source': {'timestamp': ev.generated.isoformat(),
                                   'traits': traits,
                                   'raw': ev.raw}}

        error = None
        for ok, result in helpers.streaming_bulk(
                self.conn, _build_bulk_index(events)):
            if not ok:
                __, result = result.popitem()
                if result['status'] == 409:
                    LOG.info('Duplicate event detected, skipping it: %s',
                             result)
                else:
                    LOG.exception('Failed to record event: %s', result)
                    error = storage.StorageUnknownWriteError(result)

        if self._refresh_on_write:
            self.conn.indices.refresh(index='%s_*' % self.index_name)
            while self.conn.cluster.pending_tasks(local=True)['tasks']:
                pass
        if error:
            raise error
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号