test_healthchecks.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:monasca-events-api 作者: openstack 项目源码 文件源码
def test_should_report_unhealthy_if_kafka_unhealthy(self, kafka_check):
        url = 'localhost:8200'
        err_str = 'Could not connect to kafka at %s' % url
        kafka_check.healthcheck.return_value = healthcheck.CheckResult(False,
                                                                       err_str)
        self.resource._kafka_check = kafka_check

        ret = self.simulate_request(ENDPOINT,
                                    headers={
                                        'Content-Type': 'application/json'
                                    },
                                    decode='utf8',
                                    method='GET')
        self.assertEqual(falcon.HTTP_SERVICE_UNAVAILABLE, self.srmock.status)

        ret = json.loads(ret)
        self.assertIn('kafka', ret)
        self.assertEqual(err_str, ret.get('kafka'))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号