def update(self, id, extra_doc, index_type=None, bulk=False):
'''update an existing doc with extra_doc.'''
conn = self.conn
index_name = self.ES_INDEX_NAME
index_type = index_type or self.ES_INDEX_TYPE
# old way, update locally and then push it back.
# return self.conn.update(extra_doc, self.ES_INDEX_NAME,
# index_type, id)
if not bulk:
body = {'doc': extra_doc}
return conn.update(index_name, index_type, id, body)
else:
raise NotImplementedError
'''
# ES supports bulk update since v0.90.1.
op_type = 'update'
cmd = {op_type: {"_index": index_name,
"_type": index_type,
"_id": id}
}
doc = json.dumps({"doc": extra_doc}, cls=conn.encoder)
command = "%s\n%s" % (json.dumps(cmd, cls=conn.encoder), doc)
conn.bulker.add(command)
return conn.flush_bulk()
'''
python类bulk()的实例源码
def update_docs(self, partial_docs, **kwargs):
index_name = self.ES_INDEX_NAME
doc_type = self.ES_INDEX_TYPE
def _get_bulk(doc):
doc = {
'_op_type': 'update',
"_index": index_name,
"_type": doc_type,
"_id": doc['_id'],
"doc": doc
}
return doc
actions = (_get_bulk(doc) for doc in partial_docs)
return helpers.bulk(self.conn, actions, chunk_size=self.step, **kwargs)
def create_all_dictionary_data(connection, index_name, doc_type, logger, entity_data_directory_path=None,
csv_file_paths=None, **kwargs):
"""
Indexes all entity data from csv files stored at entity_data_directory_path, one file at a time
Args:
connection: Elasticsearch client object
index_name: The name of the index
doc_type: The type of the documents being indexed
logger: logging object to log at debug and exception level
entity_data_directory_path: Optional, Path of the directory containing the entity data csv files.
Default is None
csv_file_paths: Optional, list of file paths to csv files. Default is None
kwargs:
Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
"""
logger.debug('%s: +++ Started: create_all_dictionary_data() +++' % log_prefix)
if entity_data_directory_path:
logger.debug('%s: \t== Fetching from variants/ ==' % log_prefix)
csv_files = get_files_from_directory(entity_data_directory_path)
for csv_file in csv_files:
csv_file_path = os.path.join(entity_data_directory_path, csv_file)
create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
csv_file_path=csv_file_path, update=False, logger=logger, **kwargs)
if csv_file_paths:
for csv_file_path in csv_file_paths:
if csv_file_path and csv_file_path.endswith('.csv'):
create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
csv_file_path=csv_file_path, update=False, logger=logger, **kwargs)
logger.debug('%s: +++ Finished: create_all_dictionary_data() +++' % log_prefix)
def recreate_all_dictionary_data(connection, index_name, doc_type, logger, entity_data_directory_path=None,
csv_file_paths=None, **kwargs):
"""
Re-indexes all entity data from csv files stored at entity_data_directory_path, one file at a time
Args:
connection: Elasticsearch client object
index_name: The name of the index
doc_type: The type of the documents being indexed
logger: logging object to log at debug and exception level
entity_data_directory_path: Optional, Path of the directory containing the entity data csv files.
Default is None
csv_file_paths: Optional, list of file paths to csv files. Default is None
kwargs:
Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
"""
logger.debug('%s: +++ Started: recreate_all_dictionary_data() +++' % log_prefix)
if entity_data_directory_path:
logger.debug('%s: \t== Fetching from variants/ ==' % log_prefix)
csv_files = get_files_from_directory(entity_data_directory_path)
for csv_file in csv_files:
csv_file_path = os.path.join(entity_data_directory_path, csv_file)
create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
csv_file_path=csv_file_path, update=True, logger=logger, **kwargs)
if csv_file_paths:
for csv_file_path in csv_file_paths:
if csv_file_path and csv_file_path.endswith('.csv'):
create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
csv_file_path=csv_file_path, update=True, logger=logger, **kwargs)
logger.debug('%s: +++ Finished: recreate_all_dictionary_data() +++' % log_prefix)
def get_variants_dictionary_value_from_key(csv_file_path, dictionary_key, logger, **kwargs):
"""
Reads the csv file at csv_file_path and create a dictionary mapping entity value to a list of their variants.
the entity values are first column of the csv file and their corresponding variants are stored in the second column
delimited by '|'
Args:
csv_file_path: absolute file path of the csv file populate entity data from
dictionary_key: name of the entity to be put the values under
logger: logging object to log at debug and exception level
kwargs:
Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
Returns:
Dictionary mapping entity value to a list of their variants.
"""
dictionary_value = defaultdict(list)
try:
csv_reader = read_csv(csv_file_path)
next(csv_reader)
for data_row in csv_reader:
try:
data = map(str.strip, data_row[1].split('|'))
# remove empty strings
data = [variant for variant in data if variant]
dictionary_value[data_row[0].strip().replace('.', ' ')].extend(data)
except Exception as e:
logger.exception('%s: \t\t== Exception in dict creation for keyword: %s -- %s -- %s =='
% (log_prefix, dictionary_key, data_row, e))
except Exception as e:
logger.exception(
'%s: \t\t\t=== Exception in __get_variants_dictionary_value_from_key() Dictionary Key: %s \n %s ===' % (
log_prefix,
dictionary_key, e.message))
return dictionary_value
def create_dictionary_data_from_file(connection, index_name, doc_type, csv_file_path, update, logger, **kwargs):
"""
Indexes all entity data from the csv file at path csv_file_path
Args:
connection: Elasticsearch client object
index_name: The name of the index
doc_type: The type of the documents being indexed
csv_file_path: absolute file path of the csv file to populate entity data from
update: boolean, True if this is a update type operation, False if create/index type operation
logger: logging object to log at debug and exception level
kwargs:
Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
"""
base_file_name = os.path.basename(csv_file_path)
dictionary_key = os.path.splitext(base_file_name)[0]
if update:
delete_entity_by_name(connection=connection, index_name=index_name, doc_type=doc_type,
entity_name=dictionary_key, logger=logger, **kwargs)
dictionary_value = get_variants_dictionary_value_from_key(csv_file_path=csv_file_path,
dictionary_key=dictionary_key, logger=logger,
**kwargs)
if dictionary_value:
add_data_elastic_search(connection=connection, index_name=index_name, doc_type=doc_type,
dictionary_key=dictionary_key,
dictionary_value=remove_duplicate_data(dictionary_value), logger=logger, **kwargs)
if os.path.exists(csv_file_path) and os.path.splitext(csv_file_path)[1] == '.csv':
os.path.basename(csv_file_path)
def _index_chunk(chunk, doc_type, index):
"""
Add/update a list of records in Elasticsearch
Args:
chunk (list):
List of serialized items to index
doc_type (str):
The doc type for each item
index (str): An Elasticsearch index
Returns:
int: Number of items inserted into Elasticsearch
"""
conn = get_conn(verify_index=index)
insert_count, errors = bulk(
conn,
chunk,
index=index,
doc_type=doc_type,
)
if len(errors) > 0:
raise ReindexException("Error during bulk insert: {errors}".format(
errors=errors
))
refresh_index(index)
return insert_count
def output_es(es, records, start_record_num, end_record_num, total_records, per_batch):
print("Inserting records %d through %d of %s" % (start_record_num, end_record_num,
(str(total_records) if total_records > 0 else '???')))
num_success, error_list = helpers.bulk(es, records, chunk_size=1000)
if num_success != per_batch:
print("[ERROR] %d of %d inserts succeeded!" % (num_success,per_batch))
print("[ERROR] Errors:")
print error_list
def insert_image(chunk_size, max_results=5000, from_file=None):
count = 0
success_count = 0
es = search.init()
search.Image.init()
mapping = search.Image._doc_type.mapping
mapping.save(settings.ELASTICSEARCH_INDEX)
for chunk in grouper_it(chunk_size, import_from_file(from_file)):
if not from_file and count >= max_results: # Load everything if loading from file
break
else:
images = []
for result in chunk:
images.append(result)
if len(images) > 0:
try:
# Bulk update the search engine too
search_objs = [search.db_image_to_index(img).to_dict(include_meta=True) for img in images]
models.Image.objects.bulk_create(images)
helpers.bulk(es, search_objs)
log.debug("*** Committed set of %d images", len(images))
success_count += len(images)
except IntegrityError as e:
log.warn("Got one or more integrity errors on batch: %s", e)
finally:
count += len(images)
return success_count
def write2ES(AllData, indexName, typeName , elasticPort=9220, elasticHost="localhost"):
es = Elasticsearch([{'host': elasticHost, 'port': elasticPort}], http_auth=(elasticUsername, elasticPassword))
messages = []
logging.debug('Running preset')
for record in AllData:
# print(record)
tmpMap = {"_op_type": "index", "_index": indexName, "_type": typeName }
if len(record)>2:
tmpMap.update({"label":record[0].replace("http://dbpedia.org/resource/",""), "entity_linking":{"URI":record[0].replace("/resource/","/page/"),"connection":record[1] ,"target":record[2].replace("/resource/","/page/")}})
else:
tmpMap.update({"label":record[0].replace("http://dbpedia.org/resource/",""), "entity_linking":{"URI":record[0].replace("/resource/","/page/"),"target":record[1]}})
messages.append(tmpMap)
#print messages
result = bulk(es, messages)
return result
def writeDashboard2ES(name, indexName=".kibi", typeName="dashboard" , elasticPort=9220, elasticHost="localhost"):
es = Elasticsearch([{'host': elasticHost, 'port': elasticPort}], http_auth=(elasticUsername, elasticPassword))
messages = []
PercentageEmotions = "PercentageEmotions"
logging.debug('Writing dashboard')
print('Writing dashboard')
PercentageEmotions = "PercentageEmotions"
EmotionDistribution = "EmotionDistribution"
cloudVisualization= name[:name.find("_")]+"Cloud"
nameFull= name[:name.find("_")]
tmpMap = {"_op_type": "index", "_index": indexName, "_type": typeName, "_id":name }
tmpMap.update({"defaultIndex": "reviews", "kibi:relationalPanel": "true"})
tmpMap.update({"savedSearchId": name})
tmpMap.update({"sort": ["_score", "desc" ], "version": 1,"description": "", "hits": 0, "optionsJSON": "{\"darkTheme\":false}", "uiStateJSON": "{}", "timeRestore": "false"})
tmpMap.update({"kibanaSavedObjectMeta": { "searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}}}]}"}})
if name.find("tweets")>= 0:
tmpMap.update({ "title":name, "panelsJSON": "[{\"id\":\""+name+"\",\"type\":\"search\",\"panelIndex\":3,\"size_x\":7,\"size_y\":6,\"col\":1,\"row\":1,\"columns\":[\"emotions.emotion\",\"detected_entities\",\"text\"],\"sort\":[\"entity_linking.URI\",\"desc\"]},{\"id\":\""+EmotionDistribution+"\",\"type\":\"visualization\",\"panelIndex\":4,\"size_x\":5,\"size_y\":4,\"col\":8,\"row\":3},{\"id\":\""+PercentageEmotions+"\",\"type\":\"visualization\",\"panelIndex\":5,\"size_x\":5,\"size_y\":2,\"col\":8,\"row\":1}]"})
else:
# tmpMap.update({ "title":name,"panelsJSON": "[{\"id\":\""+name+"\",\"type\":\"search\",\"panelIndex\":1,\"size_x\":6,\"size_y\":6,\"col\":1,\"row\":1,\"columns\":[\"label\",\"entity_linking.target\"],\"sort\":[\"_score\",\"desc\"]},{\"id\":\"entityEmotion\",\"type\":\"visualization\",\"panelIndex\":3,\"size_x\":6,\"size_y\":4,\"col\":7,\"row\":6},{\"id\":\"Location\",\"type\":\"search\",\"panelIndex\":2,\"size_x\":6,\"size_y\":2,\"col\":1,\"row\":7,\"columns\":[\"label\",\"entity_linking.connection\",\"entity_linking.target\"],\"sort\":[\"connection\",\"asc\"]},{\"id\":\"PercentageEmotions\",\"type\":\"visualization\",\"panelIndex\":4,\"size_x\":6,\"size_y\":5,\"col\":7,\"row\":1}]"})
tmpMap.update({ "title":name,"panelsJSON": "[{\"col\":1,\"columns\":[\"label\",\"entity_linking.target\"],\"id\":\""+name+"\",\"panelIndex\":1,\"row\":1,\"size_x\":6,\"size_y\":2,\"sort\":[\"_score\",\"desc\"],\"type\":\"search\"},{\"col\":7,\"id\":\""+PercentageEmotions+"\",\"panelIndex\":4,\"row\":1,\"size_x\":6,\"size_y\":3,\"type\":\"visualization\"},{\"col\":1,\"columns\":[\"label\",\"entity_linking.connection\",\"entity_linking.target\"],\"id\":\""+nameFull+"\",\"panelIndex\":2,\"row\":3,\"size_x\":6,\"size_y\":2,\"sort\":[\"connection\",\"asc\"],\"type\":\"search\"},{\"id\":\""+cloudVisualization+"\",\"type\":\"visualization\",\"panelIndex\":5,\"size_x\":3,\"size_y\":2,\"col\":1,\"row\":5},{\"id\":\""+EmotionDistribution+"\",\"type\":\"visualization\",\"panelIndex\":6,\"size_x\":6,\"size_y\":3,\"col\":7,\"row\":4}]"})
#"[{\"id\":\""+name+"\",\"type\":\"search\",\"panelIndex\":1,\"size_x\":12,\"size_y\":6,\"col\":1,\"row\":1,\"columns\":[\"label\", \"entity_linking.connection\", \"entity_linking.target\"],\"sort\":[\"_score\",\"desc\"]}]"})
messages.append(tmpMap)
print (messages)
result = bulk(es, messages)
return result
def add_commits(self, source_it):
def gen(it):
for source in it:
d = {}
d['_index'] = self.index
d['_type'] = self.dbname
d['_op_type'] = 'create'
d['_id'] = source['sha']
d['_source'] = source
yield d
bulk(self.es, gen(source_it))
self.es.indices.refresh(index=self.index)
def del_commits(self, sha_list):
def gen(it):
for sha in it:
d = {}
d['_index'] = self.index
d['_type'] = self.dbname
d['_op_type'] = 'delete'
d['_id'] = sha
yield d
bulk(self.es, gen(sha_list))
self.es.indices.refresh(index=self.index)
def execute(self):
"""
Index data of specified queryset
"""
client = elasticsearch.Elasticsearch(
hosts=settings.ELASTIC_SEARCH_HOSTS,
# sniff_on_start=True,
retry_on_timeout=True,
refresh=True
)
start_time = time.time()
duration = time.time()
loop_time = elapsed = duration - start_time
for batch_i, total_batches, start, end, total, qs in self.batch_qs():
loop_start = time.time()
total_left = ((total_batches - batch_i) * loop_time)
progres_msg = \
'%s of %s : %8s %8s %8s duration: %.2f left: %.2f' % (
batch_i, total_batches, start, end, total, elapsed,
total_left
)
log.debug(progres_msg)
helpers.bulk(
client, (self.convert(obj).to_dict(include_meta=True)
for obj in qs),
raise_on_error=True,
refresh=True
)
now = time.time()
elapsed = now - start_time
loop_time = now - loop_start
def save_to_elasticsearch(chars, es, es_index):
print('\r', "[elasticsearch] %s charities to save" % len(chars))
print('\r', "[elasticsearch] saving %s charities to %s index" % (len(chars), es_index))
results = bulk(es, list(chars.values()))
print('\r', "[elasticsearch] saved %s charities to %s index" % (results[0], es_index))
print('\r', "[elasticsearch] %s errors reported" % len(results[1]))
def load(lines, config):
bulks = grouper(lines, config['bulk_size'] * 3)
if config['progress']:
bulks = [x for x in bulks]
with click.progressbar(bulks) as pbar:
for i, bulk in enumerate(pbar):
try:
single_bulk_to_es(bulk, config, config['with_retry'])
except Exception as e:
log('warn', 'Chunk {i} got exception ({e}) while processing'.format(e=e, i=i))
def gen_action(self, **kwargs):
"""
single ES doc that gets consumed inside bulk uploads
"""
action = {
"_op_type": _op_type,
"_index": kwargs['index_name'],
"_type": kwargs['doc_type'],
"_id": kwargs['uid'],
# "@timestamp": kwargs['timestamp'],
"_source": kwargs['data']
}
return action
def buildEvent(timestamp = None):
event = {}
#if we don't have a desired timestamp passed in for the event, use the current time in UTC
if timestamp == None:
event['timestamp'] = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
else:
event['timestamp'] = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
#TODO make some of these random inputs from seed lists
#add these 2 for bulk API goodness
event['_index'] = 'smoke_event'
event['_type'] = 'smoke_event'
event['request'] = '/index.html'
event['response'] = '200'
event['agent'] = 'Firefox'
event['remote_ip'] = '1.1.1.1'
event['remote_user'] = ''
event['bytes'] = '1234'
event['referrer'] = 'http://example.com'
json_event = json.dumps(event)
return json_event
def main():
bulkSize = 10000 # elasticsearch bulk size
daysBack = 7
anomalyPeriod = 30 # period for anomaly to last, in minutes
anomalyMagnification = 10 # e.g. 10x more than the normal
buildEventSeries(daysBack, bulkSize)
buildAnomalyEventSeries(daysBack, anomalyPeriod, anomalyMagnification, bulkSize)