def bulk_index(self, queue, size=20):
actions = []
indexed = 0
ids = set()
while True:
item = queue.get()
if item is None:
break
doc_id = item
doc = {
'_index': 'fcc-comments',
'_type': 'document',
'_op_type': 'update',
'_id': doc_id,
'doc': {'analysis.sentiment_sig_terms_ordered': True},
}
actions.append(doc)
ids.add(doc_id)
if len(actions) == size:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
response = bulk(self.es, actions)
indexed += response[0]
if not indexed % 200:
print('\tindexed %s/%s\t%s%%' % (indexed, self.limit,
int(indexed / self.limit * 100)))
actions = []
except ConnectionTimeout:
print('error indexing: connection timeout')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
response = bulk(self.es, actions)
indexed += response[0]
print('indexed %s' % (indexed))
ids = list(ids)
#print('%s\n%s' % (len(ids), ' '.join(ids))
评论列表
文章目录