def bulk_index_from_it(
self, index, it, transform=lambda x: x, last_updated=True):
gc.collect()
err_ids = []
def _it():
for doc_body in it:
try:
log.debug('Working on record: %s', doc_body)
_id = doc_body.get(self.id_field)
try:
doc_body = transform(doc_body)
except Exception as e:
log.warn(
'Error while transforming doc ID = %s: %s',
_id, e)
raise e
if doc_body:
if last_updated:
doc_body['last_updated'] = datetime.now()
op = self.partial_index_op(
doc_id=_id,
index=index,
doc_body=doc_body,
doc_type=self.doc_type)
yield op
except Exception as e:
log.warn('Cannot process doc ID = %s: %s', _id, e)
err_ids.append(_id)
try:
self.bulk(_it())
log.info('Invoked self.bulk(_it())')
except Exception as e:
log.warn('Error in bulk index because: %s', e)
return err_ids
评论列表
文章目录