def test_api_gateway_http_integration():
test_port = 12123
backend_url = 'http://localhost:%s%s' % (test_port, API_PATH_HTTP_BACKEND)
# create target HTTP backend
def listener(**kwargs):
response = Response()
response.status_code = 200
response._content = json.dumps(kwargs['data']) if kwargs['data'] else '{}'
return response
proxy = GenericProxy(test_port, update_listener=listener)
proxy.start()
# create API Gateway and connect it to the HTTP backend
result = connect_api_gateway_to_http('test_gateway2', backend_url, path=API_PATH_HTTP_BACKEND)
# make test request to gateway
url = INBOUND_GATEWAY_URL_PATTERN.format(api_id=result['id'],
stage_name=TEST_STAGE_NAME, path=API_PATH_HTTP_BACKEND)
result = requests.get(url)
assert result.status_code == 200
assert to_str(result.content) == '{}'
data = {"data": 123}
result = requests.post(url, data=json.dumps(data))
assert result.status_code == 200
assert json.loads(to_str(result.content)) == data
# clean up
proxy.stop()
评论列表
文章目录