python类bulk()的实例源码

es.py 文件源码 项目:mygene.info 作者: biothings 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def index_bulk(self, docs, step=None):
        index_name = self.ES_INDEX_NAME
        doc_type = self.ES_INDEX_TYPE
        step = step or self.step

        def _get_bulk(doc):
            doc.update({
                "_index": index_name,
                "_type": doc_type,
            })
            return doc
        actions = (_get_bulk(doc) for doc in docs)
        try:
            return helpers.bulk(self.conn, actions, chunk_size=step)
        except helpers.BulkIndexError as e:
            # try again...
            print("Bulk error, try again...")
            return self.index_bulk(docs,step)
            ##return helpers.bulk(self.conn, actions, chunk_size=step)
        except Exception as e:
            print("Err...")
            import pickle
            pickle.dump(e,open("err","wb"))
es.py 文件源码 项目:mygene.info 作者: biothings 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def delete_docs(self, ids, step=None):
        index_name = self.ES_INDEX_NAME
        doc_type = self.ES_INDEX_TYPE
        step = step or self.step

        def _get_bulk(_id):
            doc = {
                '_op_type': 'delete',
                "_index": index_name,
                "_type": doc_type,
                "_id": _id
            }
            return doc
        actions = (_get_bulk(_id) for _id in ids)
        return helpers.bulk(self.conn, actions, chunk_size=step,
                            stats_only=True, raise_on_error=False)
update_tmserver.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, **options):
        self._initialize(**options)

        if (options['rebuild'] and
            not options['dry_run'] and
            self.es.indices.exists(self.INDEX_NAME)):

            self.es.indices.delete(index=self.INDEX_NAME)

        if (not options['dry_run'] and
            not self.es.indices.exists(self.INDEX_NAME)):

            self.es.indices.create(index=self.INDEX_NAME)

        if self.is_local_tm:
            self._set_latest_indexed_revision(**options)

        helpers.bulk(self.es, self._parse_translations(**options))
areas_code.py 文件源码 项目:ceres 作者: dicortazar 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def upload_data(events_df, es_write_index, es_write):
    # Uploading info to the new ES
    rows = events_df.to_dict("index")
    docs = []
    for row_index in rows.keys():
        row = rows[row_index]
        item_id = row[Events.PERCEVAL_UUID] + "_" + row[Git.FILE_PATH] +\
            "_" + row[Git.FILE_EVENT]
        header = {
            "_index": es_write_index,
            "_type": "item",
            "_id": item_id,
            "_source": row
        }
        docs.append(header)
    helpers.bulk(es_write, docs)
    logging.info("Written: " + str(len(docs)))
areas_code.py 文件源码 项目:ceres 作者: dicortazar 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def upload_data(events_df, es_write_index, es_write, uniq_id):
    # Uploading info to the new ES
    test = events_df.to_dict("index")
    docs = []
    for i in test.keys():
        header = {
               "_index": es_write_index,
               "_type": "item",
               "_id": int(uniq_id),
               "_source": test[i]
        }
        docs.append(header)
        uniq_id = uniq_id + 1
    print (len(docs))
    helpers.bulk(es_write, docs)
    items = []

    return uniq_id
openstack_gender.py 文件源码 项目:ceres 作者: dicortazar 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def upload_data(events_df, es_write_index, es_write, uniq_id):
    # Uploading info to the new ES
    test = events_df.to_dict("index")
    docs = []
    for i in test.keys():
        header = {
               "_index": es_write_index,
               "_type": "item",
               "_id": int(uniq_id),
               "_source": test[i]
        }
        docs.append(header)
        uniq_id = uniq_id + 1
    print (len(docs))
    helpers.bulk(es_write, docs)
    items = []

    return uniq_id
bulkOp.py 文件源码 项目:Hippocampe 作者: CERT-BDF 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def update(typeNameES, listId):
    logger.info('bulkOp.update launched')
    hippoCfg = getHippoConf()
    es = getES()
    now = strftime("%Y%m%dT%H%M%S%z")
    indexNameES = hippoCfg.get('elasticsearch', 'indexNameES')
    # k is a generator expression that produces
    # dict to update every doc wich id is in listId
    k = ({'_op_type': 'update', '_index':indexNameES, '_type':typeNameES, 'doc':{'lastQuery': now}, '_id': id}
        for id in listId)

    res = helpers.bulk(es, k)
    logger.info('bulkOp.update res: %s', res)
    #res looks like
    #(2650, [])  
    logger.info('bulkOp.update end')
    return res[0]
bulkOp.py 文件源码 项目:Hippocampe 作者: CERT-BDF 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def index(cfgPath, listData):
    logger.info('bulkOp.index launched')
    hippoCfg = getHippoConf()
    indexNameES = hippoCfg.get('elasticsearch', 'indexNameES')

    cfg = getConf(cfgPath)
    typeNameES = cfg.get('elasticsearch', 'typeIntel')

    #creating the index, only if does not exist
    index = IndexIntel(cfgPath)
    index.createIndexIntel()

    es = getES()
    k = ({'_op_type': 'index', '_index':indexNameES, '_type':typeNameES, '_source': data}
        for data in listData)
    res = helpers.bulk(es,k, raise_on_error=False)
    #res = helpers.bulk(es,k, raise_on_exception=False)
    #res = helpers.bulk(es,k)
    logger.info('bulkOp.index res: %s', res)
    logger.info('bulkOp.index end')
    return res
bulkOp.py 文件源码 项目:Hippocampe 作者: CERT-BDF 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def indexNew(coreIntelligence, listData):
    logger.info('bulkOp.indexNew launched')

    hippoCfg = getHippoConf()
    indexNameES = hippoCfg.get('elasticsearch', 'indexNameES')
    typeNameES = hippoCfg.get('elasticsearch', 'typeNameESNew')

    indexNew = IndexNew()
    indexNew.createIndexNew()

    es = getES()
    k = ({'_op_type': 'index', '_index':indexNameES, '_type':typeNameES, '_source': {'type': coreIntelligence, 'toSearch': data[coreIntelligence]}}
        for data in listData) 
    #k.next() gives:
    #{'_op_type': 'index', '_index':'hippocampe', '_type':'new', '_source': {'typeIntel': 'ip', 'intelligence': '1.1.1.1'}
    res = helpers.bulk(es,k)
    logger.info('bulkOp.index res: %s', res)
    logger.info('bulkOp.indexNew end')  
    return res[0]
ElasticBurp.py 文件源码 项目:WASE 作者: thomaspatzke 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def genAddToES(self, msgs, component):
        def menuAddToES(e):
            progress = ProgressMonitor(component, "Feeding ElasticSearch", "", 0, len(msgs))
            i = 0
            docs = list()
            for msg in msgs:
                if not Burp_onlyResponses or msg.getResponse():
                    docs.append(self.genESDoc(msg, timeStampFromResponse=True).to_dict(True))
                i += 1
                progress.setProgress(i)
            success, failed = bulk(self.es, docs, True, raise_on_error=False)
            progress.close()
            JOptionPane.showMessageDialog(self.panel, "<html><p style='width: 300px'>Successful imported %d messages, %d messages failed.</p></html>" % (success, failed), "Finished", JOptionPane.INFORMATION_MESSAGE)
        return menuAddToES

    ### Interface to ElasticSearch ###
commits.py 文件源码 项目:repoxplorer 作者: morucci 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def update_commits(self, source_it, field='repos'):
        """ Take the sha from each doc and use
        it to reference the doc to update. This method only
        support updating a single field for now. The default one
        is repos because that's the only one to make sense in
        this context.
        """
        def gen(it):
            for source in it:
                d = {}
                d['_index'] = self.index
                d['_type'] = self.dbname
                d['_op_type'] = 'update'
                d['_id'] = source['sha']
                d['_source'] = {'doc': {field: source[field]}}
                yield d
        bulk(self.es, gen(source_it))
        self.es.indices.refresh(index=self.index)
esload.py 文件源码 项目:elasticsearch-bench 作者: anuragkh 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def load_data(input_file, index, doc_type, seed):
  doc_no = seed
  successful = 0
  docs = []
  with open(input_file) as ifp:
    for line in ifp:
      doc_id = str(doc_no)
      doc = csv2json(index, doc_type, doc_id, line.rstrip())
      docs.append(doc)
      doc_no += 1
      if len(docs) == batch_size:
        docs_iter = iter(docs)
        (added, tmp) = helpers.bulk(es, docs_iter)
        successful += added
        docs = []
      if doc_no % 100000 == 0:
        print 'success: %d failed: %s' % (successful, doc_no - successful - seed)

  if len(docs) > 0:
    docs_iter = iter(docs)
    (added, tmp) = helpers.bulk(es, docs_iter)
    successful += added

  print 'Finished! Inserted: %d Failed: %d' % (successful, doc_no - successful - seed)
__init__.py 文件源码 项目:elasticsearch_loader 作者: moshe 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def single_bulk_to_es(bulk, config, attempt_retry):
    bulk = bulk_builder(bulk, config)

    max_attempt = 1
    if attempt_retry:
        max_attempt += 3

    for attempt in range(1, max_attempt+1):
        try:
            helpers.bulk(config['es_conn'], bulk, chunk_size=config['bulk_size'])
        except Exception as e:
            if attempt < max_attempt:
                wait_seconds = attempt*3
                log('warn', 'attempt [%s/%s] got exception, will retry after %s seconds' % (attempt,max_attempt,wait_seconds) )
                time.sleep(wait_seconds)
                continue

            log('error', 'attempt [%s/%s] got exception, it is a permanent data loss, no retry any more' % (attempt,max_attempt) )
            raise e

        if attempt > 1:
            log('info', 'attempt [%s/%s] succeed. we just get recovered from previous error' % (attempt,max_attempt) )

        # completed succesfully
        break
elastic.py 文件源码 项目:sigir2017-table 作者: iai-group 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def add_docs_bulk(self, docs):
        """Adds a set of documents to the index in a bulk.

        :param docs: dictionary {doc_id: doc}
        """
        actions = []
        for doc_id, doc in docs.items():
            action = {
                "_index": self.__index_name,
                "_type": self.DOC_TYPE,
                "_id": doc_id,
                "_source": doc
            }
            actions.append(action)

        if len(actions) > 0:
            helpers.bulk(self.__es, actions)
elastic.py 文件源码 项目:parade 作者: bailaohe 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def store(self, df, table, **kwargs):
        if isinstance(df, pd.DataFrame):
            es = self.open()

            records = df.to_dict(orient='records')

            if df.index.name:
                actions = [{
                    "_index": self.datasource.db,
                    "_type": table,
                    "_id": record[df.index.name],
                    "_source": record
                } for record in records]
            else:
                actions = [{
                    "_index": self.datasource.db,
                    "_type": table,
                    "_source": record
                } for record in records]

            if len(actions) > 0:
                helpers.bulk(es, actions)
elastichandler.py 文件源码 项目:EventMonkey 作者: devgc 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def BulkIndexRecords(self,records):
        '''
        Bulk Index Records
        IN
            self: EsHandler
            records: a list of records to bulk index
        '''
        ELASTIC_LOGGER.debug('[starting] Indexing Bulk Records')
        success_count,failed_items = es_bulk(
            self.esh,
            records,
            chunk_size=10000,
            raise_on_error=False
        )

        if len(failed_items) > 0:
            ELASTIC_LOGGER.error('[PID {}] {} index errors'.format(
                os.getpid(),len(failed_items)
            ))
            for failed_item in failed_items:
                ELASTIC_LOGGER.error(unicode(failed_item))

        ELASTIC_LOGGER.debug('[finished] Indexing Bulk Records')
elastic2_doc_manager.py 文件源码 项目:elastic2-doc-manager 作者: mongodb-labs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def send_buffered_operations(self):
        """Send buffered operations to Elasticsearch.

        This method is periodically called by the AutoCommitThread.
        """
        with self.lock:
            try:
                action_buffer = self.BulkBuffer.get_buffer()
                if action_buffer:
                    successes, errors = bulk(self.elastic, action_buffer)
                    LOG.debug("Bulk request finished, successfully sent %d "
                              "operations", successes)
                    if errors:
                        LOG.error(
                            "Bulk request finished with errors: %r", errors)
            except es_exceptions.ElasticsearchException:
                LOG.exception("Bulk request failed with exception")
elastic2_doc_manager.py 文件源码 项目:elastic2-doc-manager 作者: mongodb-labs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, docman):

        # Parent object
        self.docman = docman

        # Action buffer for bulk indexing
        self.action_buffer = []

        # Docs to update
        # Dict stores all documents for which firstly
        # source has to be retrieved from Elasticsearch
        # and then apply_update needs to be performed
        # Format: [ (doc, update_spec, action_buffer_index, get_from_ES) ]
        self.doc_to_update = []

        # Below dictionary contains ids of documents
        # which need to be retrieved from Elasticsearch
        # It prevents from getting same document multiple times from ES
        # Format: {"_index": {"_type": {"_id": True}}}
        self.doc_to_get = {}

        # Dictionary of sources
        # Format: {"_index": {"_type": {"_id": {"_source": actual_source}}}}
        self.sources = {}
search.py 文件源码 项目:cnschema 作者: cnschema 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load_data(self):
        """
        https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-suggesters-completion.html
        """
        es = self.connect()

        items = self._load_item_data()
        helpers.bulk(es, items)

        # id_field = "id"
        # es_index = self.es_config["es_index"]
        # es_type = self.es_config["es_type"]
        # for item in items:
        #     item = item['_source']
        #     logging.info(json.dumps(item, ensure_ascii=False, indent=4))
        #
        #     ret = es.index(index=es_index, doc_type=es_type, id=item[id_field], body=item)
        #     logging.info(ret)
documents.py 文件源码 项目:django-elasticsearch-dsl 作者: sabricot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def update(self, thing, refresh=None, action='index', **kwargs):
        """
        Update each document in ES for a model, iterable of models or queryset
        """
        if refresh is True or (
            refresh is None and self._doc_type.auto_refresh
        ):
            kwargs['refresh'] = True

        if isinstance(thing, models.Model):
            object_list = [thing]
        else:
            object_list = thing

        return self.bulk(
            self._get_actions(object_list, action), **kwargs
        )
helpers.py 文件源码 项目:elasticsearch-fabric 作者: KunihikoKido 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def bulk(chunk_size=100, filepath=None, **kwargs):
    if sys.stdin.isatty() is False:
        infile = sys.stdin
    elif filepath is not None:
        infile = open(filepath, "r")
    else:
        abort(bulk.__doc__)

    es = get_client(env.elasticsearch_alias)
    actions = []
    for action in infile.readlines():
        actions.append(json.loads(action))

    success, errors = helpers.bulk(es, actions, ignore=IGNORE, **kwargs)
    res = {
        "success": success, "errors": errors,
        "bulk": {
            "host": es.transport.get_connection().host
        }
    }
    infile.close()

    jsonprint(res)
    return res
elastic_doc_manager.py 文件源码 项目:elastic-doc-manager 作者: mongodb-labs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def send_buffered_operations(self):
        """Send buffered operations to Elasticsearch.

        This method is periodically called by the AutoCommitThread.
        """
        with self.lock:
            try:
                action_buffer = self.BulkBuffer.get_buffer()
                if action_buffer:
                    successes, errors = bulk(self.elastic, action_buffer)
                    LOG.debug("Bulk request finished, successfully sent %d "
                              "operations", successes)
                    if errors:
                        LOG.error(
                            "Bulk request finished with errors: %r", errors)
            except es_exceptions.ElasticsearchException:
                LOG.exception("Bulk request failed with exception")
elastic_doc_manager.py 文件源码 项目:elastic-doc-manager 作者: mongodb-labs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, docman):

        # Parent object
        self.docman = docman

        # Action buffer for bulk indexing
        self.action_buffer = []

        # Docs to update
        # Dict stores all documents for which firstly
        # source has to be retrieved from Elasticsearch
        # and then apply_update needs to be performed
        # Format: [ (doc, update_spec, action_buffer_index, get_from_ES) ]
        self.doc_to_update = []

        # Below dictionary contains ids of documents
        # which need to be retrieved from Elasticsearch
        # It prevents from getting same document multiple times from ES
        # Format: {"_index": {"_type": {"_id": True}}}
        self.doc_to_get = {}

        # Dictionary of sources
        # Format: {"_index": {"_type": {"_id": {"_source": actual_source}}}}
        self.sources = {}
lib.py 文件源码 项目:fccforensics 作者: RagtagOpen 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def bulk_update(es, actions, batch_size=250):
    indexed = 0
    for i in range(0, len(actions), batch_size):
        resp = bulk(es, actions[i:(i+batch_size)])
        indexed += resp[0]
        print('\tindexed %s / %s' % (indexed, len(actions)))
    return indexed
analyze.py 文件源码 项目:fccforensics 作者: RagtagOpen 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def index_worker(self, queue, size=200):
        actions = []
        indexed = 0
        while True:
            item = queue.get()
            if item is None:
                break
            id_submission, analysis = item

            doc = {
                '_index': 'fcc-comments',
                '_type': 'document',
                '_op_type': 'update',
                '_id': id_submission,
                'doc': {'analysis': analysis},
            }
            actions.append(doc)

            if len(actions) == size:
                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    try:
                        response = bulk(self.es, actions)
                        indexed += response[0]
                        print('\tanalyzed %s/%s\t%s%%' % (indexed, self.limit,
                            int(indexed / self.limit * 100)))
                        actions = []
                    except ConnectionTimeout:
                        print('error indexing: connection timeout')

        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            response = bulk(self.es, actions)
            indexed += response[0]
            print('indexed %s' % (indexed))
sentiment.py 文件源码 项目:fccforensics 作者: RagtagOpen 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def bulk_index(self, queue, size=20):

        actions = []
        indexed = 0
        ids = set()
        while True:
            item = queue.get()
            if item is None:
                break
            doc_id = item

            doc = {
                '_index': 'fcc-comments',
                '_type': 'document',
                '_op_type': 'update',
                '_id': doc_id,
                'doc': {'analysis.sentiment_sig_terms_ordered': True},
            }
            actions.append(doc)
            ids.add(doc_id)

            if len(actions) == size:
                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    try:
                        response = bulk(self.es, actions)
                        indexed += response[0]
                        if not indexed % 200:
                            print('\tindexed %s/%s\t%s%%' % (indexed, self.limit,
                                int(indexed / self.limit * 100)))
                        actions = []
                    except ConnectionTimeout:
                        print('error indexing: connection timeout')

        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            response = bulk(self.es, actions)
            indexed += response[0]
            print('indexed %s' % (indexed))
        ids = list(ids)
        #print('%s\n%s' % (len(ids), ' '.join(ids))
elastic.py 文件源码 项目:defplorex 作者: trendmicro 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def bulk_index_from_it(
            self, index, it, transform=lambda x: x, last_updated=True):

        gc.collect()
        err_ids = []

        def _it():
            for doc_body in it:
                try:
                    log.debug('Working on record: %s', doc_body)
                    _id = doc_body.get(self.id_field)

                    try:
                        doc_body = transform(doc_body)
                    except Exception as e:
                        log.warn(
                                'Error while transforming doc ID = %s: %s',
                                _id, e)
                        raise e

                    if doc_body:
                        if last_updated:
                            doc_body['last_updated'] = datetime.now()

                        op = self.partial_index_op(
                                doc_id=_id,
                                index=index,
                                doc_body=doc_body,
                                doc_type=self.doc_type)
                        yield op
                except Exception as e:
                    log.warn('Cannot process doc ID = %s: %s', _id, e)
                    err_ids.append(_id)

        try:
            self.bulk(_it())
            log.info('Invoked self.bulk(_it())')
        except Exception as e:
            log.warn('Error in bulk index because: %s', e)

        return err_ids
elastic.py 文件源码 项目:defplorex 作者: trendmicro 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def bulk(self, it):
        try:
            log.info('Sending bulk request on iterable/generator')
            args = dict(client=self.client,
                        actions=it,
                        chunk_size=self.bulk_size,
                        raise_on_exception=False,
                        raise_on_error=False,
                        stats_only=False,
                        request_timeout=self.timeout)

            res_succ, res_err = helpers.bulk(**args)

            log.info(
                    'Sent bulk request on queue iterator: '
                    'successfull ops = %d, failed ops = %d',
                    res_succ, len(res_err))

            for res in res_err:
                log.warn('Error response: %s', res)
        except Exception as e:
            log.error('Error in storing: %s', e, exc_info=True)
logIoc.py 文件源码 项目:log-ioc 作者: willylong275 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def bulk_index(self, index_name, dict_list):
                res = helpers.bulk( self.es, dict_list)
                print(" response: '%s'" % (str(res)))
                print res
                print str(res)
                return
index_wiki_dump.py 文件源码 项目:samnorsk 作者: gisleyt 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    parser = ArgumentParser()
    parser.add_argument('-d', '--dump-file')
    parser.add_argument('-e', '--elasticsearch-host', default='localhost:9200')
    parser.add_argument('-i', '--index', default='wikipedia')
    parser.add_argument('-l', '--limit', default=0, type=int)
    parser.add_argument('-p', '--id-prefix')
    opts = parser.parse_args()

    dump_fn = opts.dump_file
    es_host = opts.elasticsearch_host
    es_index = opts.index
    limit = opts.limit if opts.limit > 0 else None
    prefix = opts.id_prefix

    if not dump_fn:
        logging.error('missing filenames ...')
        sys.exit(1)

    gen = articles(dump_fn, limit=limit)

    es = Elasticsearch(hosts=[es_host])
    ic = IndicesClient(es)

    if not ic.exists(es_index):
        ic.create(es_index)

    while True:
        chunk = islice(gen, 0, 1000)

        actions = [{'_index': es_index,
                    '_type': 'article',
                    '_id': article['id'] if not prefix else '%s-%s' % (prefix, article['id']),
                    '_source': article}
                   for article in chunk]

        if not actions:
            break

        helpers.bulk(es, actions)


问题


面经


文章

微信
公众号

扫码关注公众号