python类parallel_bulk()的实例源码

es_upload.py 文件源码 项目:scrapy-cdr 作者: TeamHG-Memex 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
                  max_chunk_bytes=100 * 1024 * 1024,
                  expand_action_callback=es_helpers.expand_action,
                  **kwargs):
    """ es_helpers.parallel_bulk rewritten with imap_fixed_output_buffer
    instead of Pool.imap, which consumed unbounded memory if the generator
    outruns the upload (which usually happens).
    """
    actions = map(expand_action_callback, actions)
    for result in imap_fixed_output_buffer(
            lambda chunk: list(
                es_helpers._process_bulk_chunk(client, chunk, **kwargs)),
            es_helpers._chunk_actions(actions, chunk_size, max_chunk_bytes,
                                      client.transport.serializer),
            threads=thread_count,
        ):
        for item in result:
            yield item
pulsar.py 文件源码 项目:pulsar 作者: mcholste 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def bulk(self, client, actions, stats_only=False, **kwargs):
        success, failed = 0, 0

        # list of errors to be collected is not stats_only
        errors = []

        for ok, item in parallel_bulk(client, actions, **kwargs):
            # go through request-reponse pairs and detect failures
            if not ok:
                if not stats_only:
                    errors.append(item)
                failed += 1
            else:
                success += 1

        return success, failed if stats_only else errors
mc_neighbors.py 文件源码 项目:mediachain-indexer 作者: mediachain 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parallel_bulk(self,
                      the_iter,
                      *args,
                      **kw):

        if self.use_custom_parallel_bulk:
            return self._non_parallel_bulk(the_iter,
                                           *args,
                                           **kw)

        else:
            return es_parallel_bulk(self.es,
                                    the_iter,
                                    *args,
                                    **kw)
populate_es.py 文件源码 项目:web 作者: pyjobs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _perform_index_sync(self, sql_table_cls, es_doc_cls, id_logger):
        es_doc = es_doc_cls()

        elasticsearch_conn = connections.get_connection()

        sync_timestamp = current_server_timestamp()

        pending_insertions = self._compute_dirty_documents(
            sql_table_cls, es_doc.doc_type)

        bulk_op = self._synchronisation_op(es_doc, pending_insertions)

        self._logging(logging.INFO, 'Performing synchronization.')

        for ok, info in parallel_bulk(elasticsearch_conn, bulk_op):
            obj_id = info['index']['_id'] \
                if 'index' in info else info['update']['_id']

            if ok:
                # Mark the task as handled so we don't retreat it next time
                self._logging(logging.INFO,
                              'Document %s has been synced successfully.'
                              % obj_id)

                sql_table_cls.update_last_sync(obj_id, sync_timestamp)
            else:
                id_logger(obj_id, logging.ERROR,
                          'Error while syncing document %s index.' % obj_id)

        # Refresh indices to increase research speed
        elasticsearch_dsl.Index(es_doc.index).refresh()
populate_es.py 文件源码 项目:web 作者: pyjobs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _geocomplete_index_batch(self, elasticsearch_conn, to_index):
        log_msg = 'Indexing documents.'
        self._logging(logging.INFO, log_msg)

        for ok, info in parallel_bulk(elasticsearch_conn, to_index):
            if not ok:
                doc_id = info['create']['_id']
                doc_type = info['create']['_type']
                doc_index = info['create']['_index']

                logging_level = logging.ERROR
                err_msg = "Couldn't index document: '%s', of type: %s, " \
                          "under index: %s." % (doc_id, doc_type, doc_index)

                self._logging(logging_level, err_msg)
generate_data.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self):
        # Chequeo si existe el índice, si no, lo creo
        if not self.elastic.indices.exists(settings.TEST_INDEX):
            self.elastic.indices.create(settings.TEST_INDEX,
                                        body=INDEX_CREATION_BODY)

        for interval in COLLAPSE_INTERVALS:
            self.init_series(interval)

        for success, info in parallel_bulk(self.elastic, self.bulk_items):
            if not success:
                print("ERROR:", info)
indexer.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run(self, distributions=None):
        """Indexa en Elasticsearch todos los datos de las
        distribuciones guardadas en la base de datos, o las
        especificadas por el iterable 'distributions'
        """
        self.init_index()

        # Optimización: Desactivo el refresh de los datos mientras indexo
        self.elastic.indices.put_settings(
            index=self.index,
            body=constants.DEACTIVATE_REFRESH_BODY
        )

        logger.info(strings.INDEX_START)

        for distribution in distributions:
            fields = distribution.field_set.all()
            fields = {field.title: field.series_id for field in fields}
            df = self.init_df(distribution, fields)

            self.generate_properties(df, fields)

        logger.info(strings.BULK_REQUEST_START)

        for success, info in parallel_bulk(self.elastic, self.bulk_actions):
            if not success:
                logger.warn(strings.BULK_REQUEST_ERROR, info)

        logger.info(strings.BULK_REQUEST_END)

        # Reactivo el proceso de replicado una vez finalizado
        self.elastic.indices.put_settings(
            index=self.index,
            body=constants.REACTIVATE_REFRESH_BODY
        )
        segments = constants.FORCE_MERGE_SEGMENTS
        self.elastic.indices.forcemerge(index=self.index,
                                        max_num_segments=segments)

        logger.info(strings.INDEX_END)
pump.py 文件源码 项目:dragnet 作者: excieve 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def pump_it(es, bulk_accumulator):
    rows_pumped = 0
    # TODO: make threads and chunks configurable
    for success, info in parallel_bulk(es, bulk_accumulator, thread_count=16, chunk_size=300):
        if success:
            rows_pumped += 1
        else:
            logger.warning('Pumping documents failed: {}'.format(info))

    return rows_pumped
elastic.py 文件源码 项目:browbeat 作者: openstack 项目源码 文件源码 阅读 141 收藏 0 点赞 0 评论 0
def flush_cache(self):
        if len(self.cache) == 0:
            return True
        retry = 2
        for i in range(retry):
            try:
                to_upload = helpers.parallel_bulk(self.es,
                                                  self.cache_insertable_iterable())
                counter = 0
                num_items = len(self.cache)
                for item in to_upload:
                    self.logger.debug("{} of {} Elastic objects uploaded".format(num_items,
                                                                                 counter))
                    counter = counter + 1
                output = "Pushed {} items to Elasticsearch to index {}".format(num_items,
                                                                               self.index)
                output += " and browbeat UUID {}".format(str(browbeat_uuid))
                self.logger.info(output)
                self.cache = deque()
                self.last_upload = datetime.datetime.utcnow()
                return True
            except Exception as Err:
                self.logger.error(
                    "Error pushing data to Elasticsearch, going to retry"
                    " in 10 seconds")
                self.logger.error("Exception: {}".format(Err))
                time.sleep(10)
                if i == (retry - 1):
                    self.logger.error("Pushing Data to Elasticsearch failed in spite of retry,"
                                      " dumping JSON for {} cached items".format(len(self.cache)))
                    for item in self.cache:
                        filename = item['test_name'] + '-' + item['identifier']
                        filename += '-elastic' + '.' + 'json'
                        elastic_file = os.path.join(item['result_dir'],
                                                    filename)

                        with open(elastic_file, 'w') as result_file:
                            json.dump(item['result'],
                                      result_file,
                                      indent=4,
                                      sort_keys=True)

                            self.logger.info("Saved Elasticsearch consumable result JSON to {}".
                                             format(elastic_file))
                    self.cache = deque()
                    self.last_upload = datetime.datetime.utcnow()
                    return False


问题


面经


文章

微信
公众号

扫码关注公众号