def test_acceptance(self):
logger.debug('starting acceptance test')
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
update_active_node=DEFAULT
) as cls_mocks:
# setup some return values
cls_mocks['run_reactor'].side_effect = self.se_run_reactor
cls_mocks['get_active_node'].return_value = 'foo:1234'
cls_mocks['update_active_node'].side_effect = self.se_update_active
# instantiate class
self.cls = VaultRedirector('consul:1234', poll_interval=0.5)
self.cls.log_enabled = False
# make sure active is None (starting state)
assert self.cls.active_node_ip_port is None
self.cls.bind_port = self.get_open_port()
self.cls.log_enabled = True
# setup an async task to make the HTTP request
self.cls.reactor.callLater(2.0, self.se_requester)
# do this in case the callLater for self.stop_reactor fails...
signal.signal(signal.SIGALRM, self.stop_reactor)
signal.alarm(20) # send SIGALRM in 20 seconds, to stop runaway loop
self.cls.run()
signal.alarm(0) # disable SIGALRM
assert self.cls.active_node_ip_port == 'bar:5678' # from update_active
assert self.update_active_called is True
assert cls_mocks['update_active_node'].mock_calls[0] == call()
assert cls_mocks['run_reactor'].mock_calls == [call()]
assert cls_mocks['get_active_node'].mock_calls == [call()]
# HTTP response checks
resp = json.loads(self.response)
# /bar/baz
assert resp['/bar/baz']['headers'][
'Server'] == "vault-redirector/%s/TwistedWeb/%s" % (
_VERSION, twisted_version.short()
)
assert resp['/bar/baz']['headers'][
'Location'] == 'http://bar:5678/bar/baz'
assert resp['/bar/baz']['status_code'] == 307
# /vault-redirector-health
assert resp['/vault-redirector-health']['status_code'] == 200
health_info = json.loads(resp['/vault-redirector-health']['text'])
assert resp['/vault-redirector-health']['headers'][
'Content-Type'] == 'application/json'
assert health_info['healthy'] is True
assert health_info['application'] == 'vault-redirector'
assert health_info['version'] == _VERSION
assert health_info['source'] == _PROJECT_URL
assert health_info['consul_host_port'] == 'consul:1234'
assert health_info['active_vault'] == 'bar:5678'
test_redirector.py 文件源码
python
阅读 41
收藏 0
点赞 0
评论 0
评论列表
文章目录