def handle_command(self, doc, namespace, timestamp):
# Flush buffer before handle command
self.commit()
db = namespace.split('.', 1)[0]
if doc.get('dropDatabase'):
dbs = self.command_helper.map_db(db)
for _db in dbs:
self.elastic.indices.delete(index=_db.lower())
if doc.get('renameCollection'):
raise errors.OperationFailed(
"elastic_doc_manager does not support renaming a mapping.")
if doc.get('create'):
db, coll = self.command_helper.map_collection(db, doc['create'])
if db and coll:
self.elastic.indices.put_mapping(
index=db.lower(), doc_type=coll,
body={
"_source": {"enabled": True}
})
if doc.get('drop'):
db, coll = self.command_helper.map_collection(db, doc['drop'])
if db and coll:
# This will delete the items in coll, but not get rid of the
# mapping.
warnings.warn("Deleting all documents of type %s on index %s."
"The mapping definition will persist and must be"
"removed manually." % (coll, db))
responses = streaming_bulk(
self.elastic,
(dict(result, _op_type='delete') for result in scan(
self.elastic, index=db.lower(), doc_type=coll)))
for ok, resp in responses:
if not ok:
LOG.error(
"Error occurred while deleting ElasticSearch docum"
"ent during handling of 'drop' command: %r" % resp)
elastic2_doc_manager.py 文件源码
python
阅读 18
收藏 0
点赞 0
评论 0
评论列表
文章目录