def test_delete_dashid(self):
(l_srcName, l_dstName) = self.create_indices()
with patch('sys.stdout', new=StringIO()) as fake_out, patch('sys.stderr', new=StringIO()) as fake_err:
l_kibtool = kibtool.KibTool(["./test_kibtool", "--kibfrom", l_srcName,
"--dashid", "dashboard-1",
"--delete"])
l_kibtool.execute()
self.assertEquals(fake_out.getvalue().strip(), "")
self.assertEquals(fake_err.getvalue().strip(), "")
try:
self.client.get(index=l_srcName, doc_type="dashboard", id="dashboard-1")
self.assertTrue(False, "dashboard-1 still present")
except exceptions.NotFoundError as e:
pass
l_dst = self.client.search(index=l_dstName, doc_type="*", body={"query": {"match_all": {}}})
self.assertEquals(l_dst["hits"]["total"], 0)
评论列表
文章目录