def test_request_timeout():
timeout = 0.1
http_scheme = 'http'
host = 'coordinator'
port = 8080
url = http_scheme + '://' + host + ':' + str(port) + constants.URL_STATEMENT_PATH
def long_call(request, uri, headers):
time.sleep(timeout * 2)
return (200, headers, "delayed success")
httpretty.enable()
for method in [httpretty.POST, httpretty.GET]:
httpretty.register_uri(method, url, body=long_call)
# timeout without retry
for request_timeout in [timeout, (timeout, timeout)]:
req = PrestoRequest(
host=host,
port=port,
user='test',
http_scheme=http_scheme,
max_attempts=1,
request_timeout=request_timeout,
)
with pytest.raises(requests.exceptions.Timeout):
req.get(url)
with pytest.raises(requests.exceptions.Timeout):
req.post('select 1')
httpretty.disable()
httpretty.reset()
评论列表
文章目录