def index_bulk(self, docs, step=None):
index_name = self.ES_INDEX_NAME
doc_type = self.ES_INDEX_TYPE
step = step or self.step
def _get_bulk(doc):
doc.update({
"_index": index_name,
"_type": doc_type,
})
return doc
actions = (_get_bulk(doc) for doc in docs)
try:
return helpers.bulk(self.conn, actions, chunk_size=step)
except helpers.BulkIndexError as e:
# try again...
print("Bulk error, try again...")
return self.index_bulk(docs,step)
##return helpers.bulk(self.conn, actions, chunk_size=step)
except Exception as e:
print("Err...")
import pickle
pickle.dump(e,open("err","wb"))
python类bulk()的实例源码
def delete_docs(self, ids, step=None):
index_name = self.ES_INDEX_NAME
doc_type = self.ES_INDEX_TYPE
step = step or self.step
def _get_bulk(_id):
doc = {
'_op_type': 'delete',
"_index": index_name,
"_type": doc_type,
"_id": _id
}
return doc
actions = (_get_bulk(_id) for _id in ids)
return helpers.bulk(self.conn, actions, chunk_size=step,
stats_only=True, raise_on_error=False)
def handle(self, **options):
self._initialize(**options)
if (options['rebuild'] and
not options['dry_run'] and
self.es.indices.exists(self.INDEX_NAME)):
self.es.indices.delete(index=self.INDEX_NAME)
if (not options['dry_run'] and
not self.es.indices.exists(self.INDEX_NAME)):
self.es.indices.create(index=self.INDEX_NAME)
if self.is_local_tm:
self._set_latest_indexed_revision(**options)
helpers.bulk(self.es, self._parse_translations(**options))
def upload_data(events_df, es_write_index, es_write):
# Uploading info to the new ES
rows = events_df.to_dict("index")
docs = []
for row_index in rows.keys():
row = rows[row_index]
item_id = row[Events.PERCEVAL_UUID] + "_" + row[Git.FILE_PATH] +\
"_" + row[Git.FILE_EVENT]
header = {
"_index": es_write_index,
"_type": "item",
"_id": item_id,
"_source": row
}
docs.append(header)
helpers.bulk(es_write, docs)
logging.info("Written: " + str(len(docs)))
def upload_data(events_df, es_write_index, es_write, uniq_id):
# Uploading info to the new ES
test = events_df.to_dict("index")
docs = []
for i in test.keys():
header = {
"_index": es_write_index,
"_type": "item",
"_id": int(uniq_id),
"_source": test[i]
}
docs.append(header)
uniq_id = uniq_id + 1
print (len(docs))
helpers.bulk(es_write, docs)
items = []
return uniq_id
def upload_data(events_df, es_write_index, es_write, uniq_id):
# Uploading info to the new ES
test = events_df.to_dict("index")
docs = []
for i in test.keys():
header = {
"_index": es_write_index,
"_type": "item",
"_id": int(uniq_id),
"_source": test[i]
}
docs.append(header)
uniq_id = uniq_id + 1
print (len(docs))
helpers.bulk(es_write, docs)
items = []
return uniq_id
def update(typeNameES, listId):
logger.info('bulkOp.update launched')
hippoCfg = getHippoConf()
es = getES()
now = strftime("%Y%m%dT%H%M%S%z")
indexNameES = hippoCfg.get('elasticsearch', 'indexNameES')
# k is a generator expression that produces
# dict to update every doc wich id is in listId
k = ({'_op_type': 'update', '_index':indexNameES, '_type':typeNameES, 'doc':{'lastQuery': now}, '_id': id}
for id in listId)
res = helpers.bulk(es, k)
logger.info('bulkOp.update res: %s', res)
#res looks like
#(2650, [])
logger.info('bulkOp.update end')
return res[0]
def index(cfgPath, listData):
logger.info('bulkOp.index launched')
hippoCfg = getHippoConf()
indexNameES = hippoCfg.get('elasticsearch', 'indexNameES')
cfg = getConf(cfgPath)
typeNameES = cfg.get('elasticsearch', 'typeIntel')
#creating the index, only if does not exist
index = IndexIntel(cfgPath)
index.createIndexIntel()
es = getES()
k = ({'_op_type': 'index', '_index':indexNameES, '_type':typeNameES, '_source': data}
for data in listData)
res = helpers.bulk(es,k, raise_on_error=False)
#res = helpers.bulk(es,k, raise_on_exception=False)
#res = helpers.bulk(es,k)
logger.info('bulkOp.index res: %s', res)
logger.info('bulkOp.index end')
return res
def indexNew(coreIntelligence, listData):
logger.info('bulkOp.indexNew launched')
hippoCfg = getHippoConf()
indexNameES = hippoCfg.get('elasticsearch', 'indexNameES')
typeNameES = hippoCfg.get('elasticsearch', 'typeNameESNew')
indexNew = IndexNew()
indexNew.createIndexNew()
es = getES()
k = ({'_op_type': 'index', '_index':indexNameES, '_type':typeNameES, '_source': {'type': coreIntelligence, 'toSearch': data[coreIntelligence]}}
for data in listData)
#k.next() gives:
#{'_op_type': 'index', '_index':'hippocampe', '_type':'new', '_source': {'typeIntel': 'ip', 'intelligence': '1.1.1.1'}
res = helpers.bulk(es,k)
logger.info('bulkOp.index res: %s', res)
logger.info('bulkOp.indexNew end')
return res[0]
def genAddToES(self, msgs, component):
def menuAddToES(e):
progress = ProgressMonitor(component, "Feeding ElasticSearch", "", 0, len(msgs))
i = 0
docs = list()
for msg in msgs:
if not Burp_onlyResponses or msg.getResponse():
docs.append(self.genESDoc(msg, timeStampFromResponse=True).to_dict(True))
i += 1
progress.setProgress(i)
success, failed = bulk(self.es, docs, True, raise_on_error=False)
progress.close()
JOptionPane.showMessageDialog(self.panel, "<html><p style='width: 300px'>Successful imported %d messages, %d messages failed.</p></html>" % (success, failed), "Finished", JOptionPane.INFORMATION_MESSAGE)
return menuAddToES
### Interface to ElasticSearch ###
def update_commits(self, source_it, field='repos'):
""" Take the sha from each doc and use
it to reference the doc to update. This method only
support updating a single field for now. The default one
is repos because that's the only one to make sense in
this context.
"""
def gen(it):
for source in it:
d = {}
d['_index'] = self.index
d['_type'] = self.dbname
d['_op_type'] = 'update'
d['_id'] = source['sha']
d['_source'] = {'doc': {field: source[field]}}
yield d
bulk(self.es, gen(source_it))
self.es.indices.refresh(index=self.index)
def load_data(input_file, index, doc_type, seed):
doc_no = seed
successful = 0
docs = []
with open(input_file) as ifp:
for line in ifp:
doc_id = str(doc_no)
doc = csv2json(index, doc_type, doc_id, line.rstrip())
docs.append(doc)
doc_no += 1
if len(docs) == batch_size:
docs_iter = iter(docs)
(added, tmp) = helpers.bulk(es, docs_iter)
successful += added
docs = []
if doc_no % 100000 == 0:
print 'success: %d failed: %s' % (successful, doc_no - successful - seed)
if len(docs) > 0:
docs_iter = iter(docs)
(added, tmp) = helpers.bulk(es, docs_iter)
successful += added
print 'Finished! Inserted: %d Failed: %d' % (successful, doc_no - successful - seed)
def single_bulk_to_es(bulk, config, attempt_retry):
bulk = bulk_builder(bulk, config)
max_attempt = 1
if attempt_retry:
max_attempt += 3
for attempt in range(1, max_attempt+1):
try:
helpers.bulk(config['es_conn'], bulk, chunk_size=config['bulk_size'])
except Exception as e:
if attempt < max_attempt:
wait_seconds = attempt*3
log('warn', 'attempt [%s/%s] got exception, will retry after %s seconds' % (attempt,max_attempt,wait_seconds) )
time.sleep(wait_seconds)
continue
log('error', 'attempt [%s/%s] got exception, it is a permanent data loss, no retry any more' % (attempt,max_attempt) )
raise e
if attempt > 1:
log('info', 'attempt [%s/%s] succeed. we just get recovered from previous error' % (attempt,max_attempt) )
# completed succesfully
break
def add_docs_bulk(self, docs):
"""Adds a set of documents to the index in a bulk.
:param docs: dictionary {doc_id: doc}
"""
actions = []
for doc_id, doc in docs.items():
action = {
"_index": self.__index_name,
"_type": self.DOC_TYPE,
"_id": doc_id,
"_source": doc
}
actions.append(action)
if len(actions) > 0:
helpers.bulk(self.__es, actions)
def store(self, df, table, **kwargs):
if isinstance(df, pd.DataFrame):
es = self.open()
records = df.to_dict(orient='records')
if df.index.name:
actions = [{
"_index": self.datasource.db,
"_type": table,
"_id": record[df.index.name],
"_source": record
} for record in records]
else:
actions = [{
"_index": self.datasource.db,
"_type": table,
"_source": record
} for record in records]
if len(actions) > 0:
helpers.bulk(es, actions)
def BulkIndexRecords(self,records):
'''
Bulk Index Records
IN
self: EsHandler
records: a list of records to bulk index
'''
ELASTIC_LOGGER.debug('[starting] Indexing Bulk Records')
success_count,failed_items = es_bulk(
self.esh,
records,
chunk_size=10000,
raise_on_error=False
)
if len(failed_items) > 0:
ELASTIC_LOGGER.error('[PID {}] {} index errors'.format(
os.getpid(),len(failed_items)
))
for failed_item in failed_items:
ELASTIC_LOGGER.error(unicode(failed_item))
ELASTIC_LOGGER.debug('[finished] Indexing Bulk Records')
elastic2_doc_manager.py 文件源码
项目:elastic2-doc-manager
作者: mongodb-labs
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def send_buffered_operations(self):
"""Send buffered operations to Elasticsearch.
This method is periodically called by the AutoCommitThread.
"""
with self.lock:
try:
action_buffer = self.BulkBuffer.get_buffer()
if action_buffer:
successes, errors = bulk(self.elastic, action_buffer)
LOG.debug("Bulk request finished, successfully sent %d "
"operations", successes)
if errors:
LOG.error(
"Bulk request finished with errors: %r", errors)
except es_exceptions.ElasticsearchException:
LOG.exception("Bulk request failed with exception")
elastic2_doc_manager.py 文件源码
项目:elastic2-doc-manager
作者: mongodb-labs
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def __init__(self, docman):
# Parent object
self.docman = docman
# Action buffer for bulk indexing
self.action_buffer = []
# Docs to update
# Dict stores all documents for which firstly
# source has to be retrieved from Elasticsearch
# and then apply_update needs to be performed
# Format: [ (doc, update_spec, action_buffer_index, get_from_ES) ]
self.doc_to_update = []
# Below dictionary contains ids of documents
# which need to be retrieved from Elasticsearch
# It prevents from getting same document multiple times from ES
# Format: {"_index": {"_type": {"_id": True}}}
self.doc_to_get = {}
# Dictionary of sources
# Format: {"_index": {"_type": {"_id": {"_source": actual_source}}}}
self.sources = {}
def load_data(self):
"""
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-suggesters-completion.html
"""
es = self.connect()
items = self._load_item_data()
helpers.bulk(es, items)
# id_field = "id"
# es_index = self.es_config["es_index"]
# es_type = self.es_config["es_type"]
# for item in items:
# item = item['_source']
# logging.info(json.dumps(item, ensure_ascii=False, indent=4))
#
# ret = es.index(index=es_index, doc_type=es_type, id=item[id_field], body=item)
# logging.info(ret)
def update(self, thing, refresh=None, action='index', **kwargs):
"""
Update each document in ES for a model, iterable of models or queryset
"""
if refresh is True or (
refresh is None and self._doc_type.auto_refresh
):
kwargs['refresh'] = True
if isinstance(thing, models.Model):
object_list = [thing]
else:
object_list = thing
return self.bulk(
self._get_actions(object_list, action), **kwargs
)
def bulk(chunk_size=100, filepath=None, **kwargs):
if sys.stdin.isatty() is False:
infile = sys.stdin
elif filepath is not None:
infile = open(filepath, "r")
else:
abort(bulk.__doc__)
es = get_client(env.elasticsearch_alias)
actions = []
for action in infile.readlines():
actions.append(json.loads(action))
success, errors = helpers.bulk(es, actions, ignore=IGNORE, **kwargs)
res = {
"success": success, "errors": errors,
"bulk": {
"host": es.transport.get_connection().host
}
}
infile.close()
jsonprint(res)
return res
def send_buffered_operations(self):
"""Send buffered operations to Elasticsearch.
This method is periodically called by the AutoCommitThread.
"""
with self.lock:
try:
action_buffer = self.BulkBuffer.get_buffer()
if action_buffer:
successes, errors = bulk(self.elastic, action_buffer)
LOG.debug("Bulk request finished, successfully sent %d "
"operations", successes)
if errors:
LOG.error(
"Bulk request finished with errors: %r", errors)
except es_exceptions.ElasticsearchException:
LOG.exception("Bulk request failed with exception")
def __init__(self, docman):
# Parent object
self.docman = docman
# Action buffer for bulk indexing
self.action_buffer = []
# Docs to update
# Dict stores all documents for which firstly
# source has to be retrieved from Elasticsearch
# and then apply_update needs to be performed
# Format: [ (doc, update_spec, action_buffer_index, get_from_ES) ]
self.doc_to_update = []
# Below dictionary contains ids of documents
# which need to be retrieved from Elasticsearch
# It prevents from getting same document multiple times from ES
# Format: {"_index": {"_type": {"_id": True}}}
self.doc_to_get = {}
# Dictionary of sources
# Format: {"_index": {"_type": {"_id": {"_source": actual_source}}}}
self.sources = {}
def bulk_update(es, actions, batch_size=250):
indexed = 0
for i in range(0, len(actions), batch_size):
resp = bulk(es, actions[i:(i+batch_size)])
indexed += resp[0]
print('\tindexed %s / %s' % (indexed, len(actions)))
return indexed
def index_worker(self, queue, size=200):
actions = []
indexed = 0
while True:
item = queue.get()
if item is None:
break
id_submission, analysis = item
doc = {
'_index': 'fcc-comments',
'_type': 'document',
'_op_type': 'update',
'_id': id_submission,
'doc': {'analysis': analysis},
}
actions.append(doc)
if len(actions) == size:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
response = bulk(self.es, actions)
indexed += response[0]
print('\tanalyzed %s/%s\t%s%%' % (indexed, self.limit,
int(indexed / self.limit * 100)))
actions = []
except ConnectionTimeout:
print('error indexing: connection timeout')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
response = bulk(self.es, actions)
indexed += response[0]
print('indexed %s' % (indexed))
def bulk_index(self, queue, size=20):
actions = []
indexed = 0
ids = set()
while True:
item = queue.get()
if item is None:
break
doc_id = item
doc = {
'_index': 'fcc-comments',
'_type': 'document',
'_op_type': 'update',
'_id': doc_id,
'doc': {'analysis.sentiment_sig_terms_ordered': True},
}
actions.append(doc)
ids.add(doc_id)
if len(actions) == size:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
response = bulk(self.es, actions)
indexed += response[0]
if not indexed % 200:
print('\tindexed %s/%s\t%s%%' % (indexed, self.limit,
int(indexed / self.limit * 100)))
actions = []
except ConnectionTimeout:
print('error indexing: connection timeout')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
response = bulk(self.es, actions)
indexed += response[0]
print('indexed %s' % (indexed))
ids = list(ids)
#print('%s\n%s' % (len(ids), ' '.join(ids))
def bulk_index_from_it(
self, index, it, transform=lambda x: x, last_updated=True):
gc.collect()
err_ids = []
def _it():
for doc_body in it:
try:
log.debug('Working on record: %s', doc_body)
_id = doc_body.get(self.id_field)
try:
doc_body = transform(doc_body)
except Exception as e:
log.warn(
'Error while transforming doc ID = %s: %s',
_id, e)
raise e
if doc_body:
if last_updated:
doc_body['last_updated'] = datetime.now()
op = self.partial_index_op(
doc_id=_id,
index=index,
doc_body=doc_body,
doc_type=self.doc_type)
yield op
except Exception as e:
log.warn('Cannot process doc ID = %s: %s', _id, e)
err_ids.append(_id)
try:
self.bulk(_it())
log.info('Invoked self.bulk(_it())')
except Exception as e:
log.warn('Error in bulk index because: %s', e)
return err_ids
def bulk(self, it):
try:
log.info('Sending bulk request on iterable/generator')
args = dict(client=self.client,
actions=it,
chunk_size=self.bulk_size,
raise_on_exception=False,
raise_on_error=False,
stats_only=False,
request_timeout=self.timeout)
res_succ, res_err = helpers.bulk(**args)
log.info(
'Sent bulk request on queue iterator: '
'successfull ops = %d, failed ops = %d',
res_succ, len(res_err))
for res in res_err:
log.warn('Error response: %s', res)
except Exception as e:
log.error('Error in storing: %s', e, exc_info=True)
def bulk_index(self, index_name, dict_list):
res = helpers.bulk( self.es, dict_list)
print(" response: '%s'" % (str(res)))
print res
print str(res)
return
def main():
parser = ArgumentParser()
parser.add_argument('-d', '--dump-file')
parser.add_argument('-e', '--elasticsearch-host', default='localhost:9200')
parser.add_argument('-i', '--index', default='wikipedia')
parser.add_argument('-l', '--limit', default=0, type=int)
parser.add_argument('-p', '--id-prefix')
opts = parser.parse_args()
dump_fn = opts.dump_file
es_host = opts.elasticsearch_host
es_index = opts.index
limit = opts.limit if opts.limit > 0 else None
prefix = opts.id_prefix
if not dump_fn:
logging.error('missing filenames ...')
sys.exit(1)
gen = articles(dump_fn, limit=limit)
es = Elasticsearch(hosts=[es_host])
ic = IndicesClient(es)
if not ic.exists(es_index):
ic.create(es_index)
while True:
chunk = islice(gen, 0, 1000)
actions = [{'_index': es_index,
'_type': 'article',
'_id': article['id'] if not prefix else '%s-%s' % (prefix, article['id']),
'_source': article}
for article in chunk]
if not actions:
break
helpers.bulk(es, actions)