python类bulk()的实例源码

es.py 文件源码 项目:mygene.info 作者: biothings 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def update(self, id, extra_doc, index_type=None, bulk=False):
        '''update an existing doc with extra_doc.'''
        conn = self.conn
        index_name = self.ES_INDEX_NAME
        index_type = index_type or self.ES_INDEX_TYPE
        # old way, update locally and then push it back.
        # return self.conn.update(extra_doc, self.ES_INDEX_NAME,
        #                         index_type, id)

        if not bulk:
            body = {'doc': extra_doc}
            return conn.update(index_name, index_type, id, body)
        else:
            raise NotImplementedError
            '''
            # ES supports bulk update since v0.90.1.
            op_type = 'update'
            cmd = {op_type: {"_index": index_name,
                             "_type": index_type,
                             "_id": id}
                   }

            doc = json.dumps({"doc": extra_doc}, cls=conn.encoder)
            command = "%s\n%s" % (json.dumps(cmd, cls=conn.encoder), doc)
            conn.bulker.add(command)
            return conn.flush_bulk()
            '''
es.py 文件源码 项目:mygene.info 作者: biothings 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def update_docs(self, partial_docs, **kwargs):
        index_name = self.ES_INDEX_NAME
        doc_type = self.ES_INDEX_TYPE

        def _get_bulk(doc):
            doc = {
                '_op_type': 'update',
                "_index": index_name,
                "_type": doc_type,
                "_id": doc['_id'],
                "doc": doc
            }
            return doc
        actions = (_get_bulk(doc) for doc in partial_docs)
        return helpers.bulk(self.conn, actions, chunk_size=self.step, **kwargs)
populate.py 文件源码 项目:chatbot_ner 作者: hellohaptik 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def create_all_dictionary_data(connection, index_name, doc_type, logger, entity_data_directory_path=None,
                               csv_file_paths=None, **kwargs):
    """
    Indexes all entity data from csv files stored at entity_data_directory_path, one file at a time
    Args:
        connection: Elasticsearch client object
        index_name: The name of the index
        doc_type: The type of the documents being indexed
        logger: logging object to log at debug and exception level
        entity_data_directory_path: Optional, Path of the directory containing the entity data csv files.
                                    Default is None
        csv_file_paths: Optional, list of file paths to csv files. Default is None
        kwargs:
            Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk

    """
    logger.debug('%s: +++ Started: create_all_dictionary_data() +++' % log_prefix)
    if entity_data_directory_path:
        logger.debug('%s: \t== Fetching from variants/ ==' % log_prefix)
        csv_files = get_files_from_directory(entity_data_directory_path)
        for csv_file in csv_files:
            csv_file_path = os.path.join(entity_data_directory_path, csv_file)
            create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
                                             csv_file_path=csv_file_path, update=False, logger=logger, **kwargs)
    if csv_file_paths:
        for csv_file_path in csv_file_paths:
            if csv_file_path and csv_file_path.endswith('.csv'):
                create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
                                                 csv_file_path=csv_file_path, update=False, logger=logger, **kwargs)
    logger.debug('%s: +++ Finished: create_all_dictionary_data() +++' % log_prefix)
populate.py 文件源码 项目:chatbot_ner 作者: hellohaptik 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def recreate_all_dictionary_data(connection, index_name, doc_type, logger, entity_data_directory_path=None,
                                 csv_file_paths=None, **kwargs):
    """
    Re-indexes all entity data from csv files stored at entity_data_directory_path, one file at a time
    Args:
        connection: Elasticsearch client object
        index_name: The name of the index
        doc_type: The type of the documents being indexed
        logger: logging object to log at debug and exception level
        entity_data_directory_path: Optional, Path of the directory containing the entity data csv files.
                                    Default is None
        csv_file_paths: Optional, list of file paths to csv files. Default is None
        kwargs:
            Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk

    """
    logger.debug('%s: +++ Started: recreate_all_dictionary_data() +++' % log_prefix)
    if entity_data_directory_path:
        logger.debug('%s: \t== Fetching from variants/ ==' % log_prefix)
        csv_files = get_files_from_directory(entity_data_directory_path)
        for csv_file in csv_files:
            csv_file_path = os.path.join(entity_data_directory_path, csv_file)
            create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
                                             csv_file_path=csv_file_path, update=True, logger=logger, **kwargs)
    if csv_file_paths:
        for csv_file_path in csv_file_paths:
            if csv_file_path and csv_file_path.endswith('.csv'):
                create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
                                                 csv_file_path=csv_file_path, update=True, logger=logger, **kwargs)
    logger.debug('%s: +++ Finished: recreate_all_dictionary_data() +++' % log_prefix)
populate.py 文件源码 项目:chatbot_ner 作者: hellohaptik 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_variants_dictionary_value_from_key(csv_file_path, dictionary_key, logger, **kwargs):
    """
    Reads the csv file at csv_file_path and create a dictionary mapping entity value to a list of their variants.
    the entity values are first column of the csv file and their corresponding variants are stored in the second column
    delimited by '|'

    Args:
        csv_file_path: absolute file path of the csv file populate entity data from
        dictionary_key: name of the entity to be put the values under
        logger: logging object to log at debug and exception level
        kwargs:
            Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk

    Returns:
        Dictionary mapping entity value to a list of their variants.
    """
    dictionary_value = defaultdict(list)
    try:
        csv_reader = read_csv(csv_file_path)
        next(csv_reader)
        for data_row in csv_reader:
            try:
                data = map(str.strip, data_row[1].split('|'))
                # remove empty strings
                data = [variant for variant in data if variant]
                dictionary_value[data_row[0].strip().replace('.', ' ')].extend(data)

            except Exception as e:
                logger.exception('%s: \t\t== Exception in dict creation for keyword: %s -- %s -- %s =='
                                 % (log_prefix, dictionary_key, data_row, e))

    except Exception as e:
        logger.exception(
            '%s: \t\t\t=== Exception in __get_variants_dictionary_value_from_key() Dictionary Key: %s \n %s  ===' % (
                log_prefix,
                dictionary_key, e.message))

    return dictionary_value
populate.py 文件源码 项目:chatbot_ner 作者: hellohaptik 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def create_dictionary_data_from_file(connection, index_name, doc_type, csv_file_path, update, logger, **kwargs):
    """
    Indexes all entity data from the csv file at path csv_file_path
    Args:
        connection: Elasticsearch client object
        index_name: The name of the index
        doc_type:  The type of the documents being indexed
        csv_file_path: absolute file path of the csv file to populate entity data from
        update: boolean, True if this is a update type operation, False if create/index type operation
        logger: logging object to log at debug and exception level
        kwargs:
            Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
    """

    base_file_name = os.path.basename(csv_file_path)
    dictionary_key = os.path.splitext(base_file_name)[0]

    if update:
        delete_entity_by_name(connection=connection, index_name=index_name, doc_type=doc_type,
                              entity_name=dictionary_key, logger=logger, **kwargs)
    dictionary_value = get_variants_dictionary_value_from_key(csv_file_path=csv_file_path,
                                                              dictionary_key=dictionary_key, logger=logger,
                                                              **kwargs)
    if dictionary_value:
        add_data_elastic_search(connection=connection, index_name=index_name, doc_type=doc_type,
                                dictionary_key=dictionary_key,
                                dictionary_value=remove_duplicate_data(dictionary_value), logger=logger, **kwargs)
    if os.path.exists(csv_file_path) and os.path.splitext(csv_file_path)[1] == '.csv':
        os.path.basename(csv_file_path)
indexing_api.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _index_chunk(chunk, doc_type, index):
    """
    Add/update a list of records in Elasticsearch

    Args:
        chunk (list):
            List of serialized items to index
        doc_type (str):
            The doc type for each item
        index (str): An Elasticsearch index

    Returns:
        int: Number of items inserted into Elasticsearch
    """

    conn = get_conn(verify_index=index)
    insert_count, errors = bulk(
        conn,
        chunk,
        index=index,
        doc_type=doc_type,
    )
    if len(errors) > 0:
        raise ReindexException("Error during bulk insert: {errors}".format(
            errors=errors
        ))

    refresh_index(index)
    return insert_count
elasticsearch_cp.py 文件源码 项目:elasticsearch-copy 作者: xcxsxvx 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def output_es(es, records, start_record_num, end_record_num, total_records, per_batch):
  print("Inserting records %d through %d of %s" % (start_record_num, end_record_num,
    (str(total_records) if total_records > 0 else '???')))
  num_success, error_list = helpers.bulk(es, records, chunk_size=1000)
  if num_success != per_batch:
    print("[ERROR] %d of %d inserts succeeded!" % (num_success,per_batch))
    print("[ERROR] Errors:")
    print error_list
handler_nypl.py 文件源码 项目:open-ledger 作者: creativecommons 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def insert_image(chunk_size, max_results=5000, from_file=None):
    count = 0
    success_count = 0
    es = search.init()
    search.Image.init()
    mapping = search.Image._doc_type.mapping
    mapping.save(settings.ELASTICSEARCH_INDEX)

    for chunk in grouper_it(chunk_size, import_from_file(from_file)):
        if not from_file and count >= max_results:  # Load everything if loading from file
            break
        else:
            images = []
            for result in chunk:
                images.append(result)
            if len(images) > 0:
                try:
                    # Bulk update the search engine too
                    search_objs = [search.db_image_to_index(img).to_dict(include_meta=True) for img in images]
                    models.Image.objects.bulk_create(images)
                    helpers.bulk(es, search_objs)
                    log.debug("*** Committed set of %d images", len(images))
                    success_count += len(images)
                except IntegrityError as e:
                    log.warn("Got one or more integrity errors on batch: %s", e)
                finally:
                    count += len(images)
    return success_count
knowledgeGraphCreator.py 文件源码 项目:knowledge-graph 作者: MixedEmotions 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def write2ES(AllData, indexName, typeName , elasticPort=9220, elasticHost="localhost"):
        es = Elasticsearch([{'host': elasticHost, 'port': elasticPort}], http_auth=(elasticUsername, elasticPassword))
        messages = []
        logging.debug('Running preset')
        for record in AllData:
            #   print(record)
                tmpMap = {"_op_type": "index", "_index": indexName, "_type": typeName }
                if len(record)>2:
                    tmpMap.update({"label":record[0].replace("http://dbpedia.org/resource/",""), "entity_linking":{"URI":record[0].replace("/resource/","/page/"),"connection":record[1] ,"target":record[2].replace("/resource/","/page/")}})
                else:
                    tmpMap.update({"label":record[0].replace("http://dbpedia.org/resource/",""), "entity_linking":{"URI":record[0].replace("/resource/","/page/"),"target":record[1]}})
                messages.append(tmpMap)
        #print messages
        result = bulk(es, messages)
        return result
knowledgeGraphCreator.py 文件源码 项目:knowledge-graph 作者: MixedEmotions 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def writeDashboard2ES(name, indexName=".kibi", typeName="dashboard" , elasticPort=9220, elasticHost="localhost"):
        es = Elasticsearch([{'host': elasticHost, 'port': elasticPort}], http_auth=(elasticUsername, elasticPassword))
        messages = []
        PercentageEmotions = "PercentageEmotions"
        logging.debug('Writing dashboard')
        print('Writing dashboard')
        PercentageEmotions = "PercentageEmotions"
        EmotionDistribution = "EmotionDistribution"
        cloudVisualization= name[:name.find("_")]+"Cloud"
        nameFull= name[:name.find("_")]

        tmpMap = {"_op_type": "index", "_index": indexName, "_type": typeName, "_id":name }
        tmpMap.update({"defaultIndex": "reviews", "kibi:relationalPanel": "true"})
        tmpMap.update({"savedSearchId": name})
        tmpMap.update({"sort": ["_score", "desc" ], "version": 1,"description": "", "hits": 0, "optionsJSON": "{\"darkTheme\":false}", "uiStateJSON": "{}", "timeRestore": "false"})
        tmpMap.update({"kibanaSavedObjectMeta": { "searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}}}]}"}})
        if name.find("tweets")>= 0:
            tmpMap.update({ "title":name, "panelsJSON": "[{\"id\":\""+name+"\",\"type\":\"search\",\"panelIndex\":3,\"size_x\":7,\"size_y\":6,\"col\":1,\"row\":1,\"columns\":[\"emotions.emotion\",\"detected_entities\",\"text\"],\"sort\":[\"entity_linking.URI\",\"desc\"]},{\"id\":\""+EmotionDistribution+"\",\"type\":\"visualization\",\"panelIndex\":4,\"size_x\":5,\"size_y\":4,\"col\":8,\"row\":3},{\"id\":\""+PercentageEmotions+"\",\"type\":\"visualization\",\"panelIndex\":5,\"size_x\":5,\"size_y\":2,\"col\":8,\"row\":1}]"})
        else:
        #   tmpMap.update({ "title":name,"panelsJSON": "[{\"id\":\""+name+"\",\"type\":\"search\",\"panelIndex\":1,\"size_x\":6,\"size_y\":6,\"col\":1,\"row\":1,\"columns\":[\"label\",\"entity_linking.target\"],\"sort\":[\"_score\",\"desc\"]},{\"id\":\"entityEmotion\",\"type\":\"visualization\",\"panelIndex\":3,\"size_x\":6,\"size_y\":4,\"col\":7,\"row\":6},{\"id\":\"Location\",\"type\":\"search\",\"panelIndex\":2,\"size_x\":6,\"size_y\":2,\"col\":1,\"row\":7,\"columns\":[\"label\",\"entity_linking.connection\",\"entity_linking.target\"],\"sort\":[\"connection\",\"asc\"]},{\"id\":\"PercentageEmotions\",\"type\":\"visualization\",\"panelIndex\":4,\"size_x\":6,\"size_y\":5,\"col\":7,\"row\":1}]"})
            tmpMap.update({ "title":name,"panelsJSON": "[{\"col\":1,\"columns\":[\"label\",\"entity_linking.target\"],\"id\":\""+name+"\",\"panelIndex\":1,\"row\":1,\"size_x\":6,\"size_y\":2,\"sort\":[\"_score\",\"desc\"],\"type\":\"search\"},{\"col\":7,\"id\":\""+PercentageEmotions+"\",\"panelIndex\":4,\"row\":1,\"size_x\":6,\"size_y\":3,\"type\":\"visualization\"},{\"col\":1,\"columns\":[\"label\",\"entity_linking.connection\",\"entity_linking.target\"],\"id\":\""+nameFull+"\",\"panelIndex\":2,\"row\":3,\"size_x\":6,\"size_y\":2,\"sort\":[\"connection\",\"asc\"],\"type\":\"search\"},{\"id\":\""+cloudVisualization+"\",\"type\":\"visualization\",\"panelIndex\":5,\"size_x\":3,\"size_y\":2,\"col\":1,\"row\":5},{\"id\":\""+EmotionDistribution+"\",\"type\":\"visualization\",\"panelIndex\":6,\"size_x\":6,\"size_y\":3,\"col\":7,\"row\":4}]"})
             #"[{\"id\":\""+name+"\",\"type\":\"search\",\"panelIndex\":1,\"size_x\":12,\"size_y\":6,\"col\":1,\"row\":1,\"columns\":[\"label\", \"entity_linking.connection\", \"entity_linking.target\"],\"sort\":[\"_score\",\"desc\"]}]"})
        messages.append(tmpMap)
        print (messages)
        result = bulk(es, messages)
        return result
commits.py 文件源码 项目:repoxplorer 作者: morucci 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def add_commits(self, source_it):
        def gen(it):
            for source in it:
                d = {}
                d['_index'] = self.index
                d['_type'] = self.dbname
                d['_op_type'] = 'create'
                d['_id'] = source['sha']
                d['_source'] = source
                yield d
        bulk(self.es, gen(source_it))
        self.es.indices.refresh(index=self.index)
commits.py 文件源码 项目:repoxplorer 作者: morucci 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def del_commits(self, sha_list):
        def gen(it):
            for sha in it:
                d = {}
                d['_index'] = self.index
                d['_type'] = self.dbname
                d['_op_type'] = 'delete'
                d['_id'] = sha
                yield d
        bulk(self.es, gen(sha_list))
        self.es.indices.refresh(index=self.index)
index.py 文件源码 项目:handelsregister 作者: Amsterdam 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def execute(self):
        """
        Index data of specified queryset
        """
        client = elasticsearch.Elasticsearch(
            hosts=settings.ELASTIC_SEARCH_HOSTS,
            # sniff_on_start=True,
            retry_on_timeout=True,
            refresh=True
        )

        start_time = time.time()
        duration = time.time()
        loop_time = elapsed = duration - start_time

        for batch_i, total_batches, start, end, total, qs in self.batch_qs():

            loop_start = time.time()
            total_left = ((total_batches - batch_i) * loop_time)

            progres_msg = \
                '%s of %s : %8s %8s %8s duration: %.2f left: %.2f' % (
                    batch_i, total_batches, start, end, total, elapsed,
                    total_left
                )

            log.debug(progres_msg)

            helpers.bulk(
                client, (self.convert(obj).to_dict(include_meta=True)
                         for obj in qs),
                raise_on_error=True,
                refresh=True
            )

            now = time.time()
            elapsed = now - start_time
            loop_time = now - loop_start
import_data.py 文件源码 项目:find-that-charity 作者: TechforgoodCAST 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def save_to_elasticsearch(chars, es, es_index):

    print('\r', "[elasticsearch] %s charities to save" % len(chars))
    print('\r', "[elasticsearch] saving %s charities to %s index" % (len(chars), es_index))
    results = bulk(es, list(chars.values()))
    print('\r', "[elasticsearch] saved %s charities to %s index" % (results[0], es_index))
    print('\r', "[elasticsearch] %s errors reported" % len(results[1]))
__init__.py 文件源码 项目:elasticsearch_loader 作者: moshe 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def load(lines, config):
    bulks = grouper(lines, config['bulk_size'] * 3)
    if config['progress']:
        bulks = [x for x in bulks]
    with click.progressbar(bulks) as pbar:
        for i, bulk in enumerate(pbar):
            try:
                single_bulk_to_es(bulk, config, config['with_retry'])
            except Exception as e:
                log('warn', 'Chunk {i} got exception ({e}) while processing'.format(e=e, i=i))
cfme_csv2elastic.py 文件源码 项目:cfme-performance 作者: redhat-performance 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def gen_action(self, **kwargs):
        """
        single ES doc that gets consumed inside bulk uploads
        """
        action = {
            "_op_type": _op_type,
            "_index": kwargs['index_name'],
            "_type": kwargs['doc_type'],
            "_id": kwargs['uid'],
            # "@timestamp": kwargs['timestamp'],
            "_source": kwargs['data']
        }
        return action
generator.py 文件源码 项目:ml-anomaly-injector 作者: plinde 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def buildEvent(timestamp = None):

    event = {}

    #if we don't have a desired timestamp passed in for the event, use the current time in UTC
    if timestamp == None:
        event['timestamp'] = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
    else:
        event['timestamp'] = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')

    #TODO make some of these random inputs from seed lists

    #add these 2 for bulk API goodness
    event['_index'] = 'smoke_event'
    event['_type'] = 'smoke_event'

    event['request'] = '/index.html'
    event['response'] = '200'
    event['agent'] = 'Firefox'
    event['remote_ip'] = '1.1.1.1'
    event['remote_user'] = ''
    event['bytes'] = '1234'
    event['referrer'] = 'http://example.com'

    json_event = json.dumps(event)

    return json_event
generator.py 文件源码 项目:ml-anomaly-injector 作者: plinde 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def main():

    bulkSize = 10000 # elasticsearch bulk size
    daysBack = 7

    anomalyPeriod = 30 # period for anomaly to last, in minutes
    anomalyMagnification = 10 # e.g. 10x more than the normal

    buildEventSeries(daysBack, bulkSize)
    buildAnomalyEventSeries(daysBack, anomalyPeriod, anomalyMagnification, bulkSize)


问题


面经


文章

微信
公众号

扫码关注公众号