test_redirector.py 文件源码

python
阅读 41 收藏 0 点赞 0 评论 0

项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码
def test_acceptance(self):
        logger.debug('starting acceptance test')
        with patch.multiple(
            pb,
            get_active_node=DEFAULT,
            run_reactor=DEFAULT,
            update_active_node=DEFAULT
        ) as cls_mocks:
            # setup some return values
            cls_mocks['run_reactor'].side_effect = self.se_run_reactor
            cls_mocks['get_active_node'].return_value = 'foo:1234'
            cls_mocks['update_active_node'].side_effect = self.se_update_active
            # instantiate class
            self.cls = VaultRedirector('consul:1234', poll_interval=0.5)
            self.cls.log_enabled = False
            # make sure active is None (starting state)
            assert self.cls.active_node_ip_port is None
            self.cls.bind_port = self.get_open_port()
            self.cls.log_enabled = True
            # setup an async task to make the HTTP request
            self.cls.reactor.callLater(2.0, self.se_requester)
            # do this in case the callLater for self.stop_reactor fails...
            signal.signal(signal.SIGALRM, self.stop_reactor)
            signal.alarm(20)  # send SIGALRM in 20 seconds, to stop runaway loop
            self.cls.run()
            signal.alarm(0)  # disable SIGALRM
        assert self.cls.active_node_ip_port == 'bar:5678'  # from update_active
        assert self.update_active_called is True
        assert cls_mocks['update_active_node'].mock_calls[0] == call()
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert cls_mocks['get_active_node'].mock_calls == [call()]
        # HTTP response checks
        resp = json.loads(self.response)
        # /bar/baz
        assert resp['/bar/baz']['headers'][
                   'Server'] == "vault-redirector/%s/TwistedWeb/%s" % (
            _VERSION, twisted_version.short()
        )
        assert resp['/bar/baz']['headers'][
                   'Location'] == 'http://bar:5678/bar/baz'
        assert resp['/bar/baz']['status_code'] == 307
        # /vault-redirector-health
        assert resp['/vault-redirector-health']['status_code'] == 200
        health_info = json.loads(resp['/vault-redirector-health']['text'])
        assert resp['/vault-redirector-health']['headers'][
            'Content-Type'] == 'application/json'
        assert health_info['healthy'] is True
        assert health_info['application'] == 'vault-redirector'
        assert health_info['version'] == _VERSION
        assert health_info['source'] == _PROJECT_URL
        assert health_info['consul_host_port'] == 'consul:1234'
        assert health_info['active_vault'] == 'bar:5678'
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号