def _do_healthcheck(self, containers, config):
path = config.get('HEALTHCHECK_URL', '/')
timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1))
if not _etcd_client:
raise exceptions.HealthcheckException('no etcd client available')
for container in containers:
try:
key = "/deis/services/{self}/{container.job_id}".format(**locals())
url = "http://{}{}".format(_etcd_client.get(key).value, path)
response = requests.get(url, timeout=timeout)
if response.status_code != requests.codes.OK:
raise exceptions.HealthcheckException(
"app failed health check (got '{}', expected: '200')".format(
response.status_code))
except (requests.Timeout, requests.ConnectionError, KeyError) as e:
raise exceptions.HealthcheckException(
'failed to connect to container ({})'.format(e))
python类Timeout()的实例源码
def download(self, url, retry_count=3, headers=None, proxies=None, data=None):
'''
:param url: ????? URL ??
:param retry_count: ?? url ????????
:param headers: http header={'X':'x', 'X':'x'}
:param proxies: ???? proxies={"https": "http://12.112.122.12:3212"}
:param data: ?? urlencode(post_data) ? POST ??
:return: ?????? None
'''
if headers:
self.request_session.headers.update(headers)
try:
if data:
content = self.request_session.post(url, data, proxies=proxies).content
else:
content = self.request_session.get(url, proxies=proxies).content
except (ConnectionError, Timeout) as e:
print('Downloader download ConnectionError or Timeout:' + str(e))
content = None
if retry_count > 0:
self.download(url, retry_count - 1, headers, proxies, data)
except Exception as e:
print('Downloader download Exception:' + str(e))
content = None
return content
def state_destroy_model(self):
base_url = self.module.params['base_url']
uuid = self.module.params['uuid']
uri = "%s/model/%s" % (base_url, uuid)
try:
req = requests.delete(uri)
if req.status_code == 200:
self.module.exit_json(changed=True)
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
self.module.exit_json(changed=False)
def test_requests_timeout():
class GetPersonAPI(HTTPEater):
request_cls = Model
response_cls = Model
url = 'http://example.com/'
def timeout(*args, **kwargs): # pylint: disable=unused-argument
raise requests.Timeout()
api = GetPersonAPI()
with requests_mock.Mocker() as mock:
mock.get(
'http://example.com/',
text=timeout
)
with pytest.raises(EaterTimeoutError):
api()
def test_authentication(self):
try:
resp = self.get('account/', op='list_authorisation_tokens')
except requests.Timeout as ex:
raise errors.TransientDriverError("Timeout connection to MaaS")
except Exception as ex:
raise errors.PersistentDriverError(
"Error accessing MaaS: %s" % str(ex))
if resp.status_code in [401, 403]:
raise errors.PersistentDriverError(
"MaaS API Authentication Failed")
if resp.status_code in [500, 503]:
raise errors.TransientDriverError("Received 50x error from MaaS")
if resp.status_code != 200:
raise errors.PersistentDriverError(
"Received unexpected error from MaaS")
return True
def _do_healthcheck(self, containers, config):
path = config.get('HEALTHCHECK_URL', '/')
timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1))
if not _etcd_client:
raise exceptions.HealthcheckException('no etcd client available')
for container in containers:
try:
key = "/deis/services/{self}/{container.job_id}".format(**locals())
url = "http://{}{}".format(_etcd_client.get(key).value, path)
response = requests.get(url, timeout=timeout)
if response.status_code != requests.codes.OK:
raise exceptions.HealthcheckException(
"app failed health check (got '{}', expected: '200')".format(
response.status_code))
except (requests.Timeout, requests.ConnectionError, KeyError) as e:
raise exceptions.HealthcheckException(
'failed to connect to container ({})'.format(e))
def api_delete(server_name, api, session_id):
#Header and URL for delete call
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.delete(url, headers=headers, verify=False)
except requests.ConnectionError:
raise TintrRequestsiApiException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# PUT
def api_put(server_name, api, payload, session_id):
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.put(url, data=json.dumps(payload),
headers=headers, verify=False)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# POST
def api_post(server_name, api, payload, session_id):
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.post(url, data=json.dumps(payload),
headers=headers, verify=False)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# Login.
def download_file(server_name, report_url, session_id, file_name):
headers = {'content-type': 'application/json'}
try:
r = requests.get(report_url, headers=headers, verify=False, stream=True)
# if HTTP Response is not 200 then raise an exception
if r.status_code != 200:
message = "The HTTP response for get call to the server is not 200."
raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text)
with open(file_name, 'w') as file_h:
for block in r.iter_content(4096):
file_h.write(block)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except Exception as e:
raise TintriRequestsException("An unexpected error: " + e.__str__())
def test_get_podm_status_Offline_by_http_exception(self, mock_get):
mock_get.side_effect = requests.ConnectionError
self.assertEqual(redfish.pod_status('url', 'username', 'password'),
constants.PODM_STATUS_OFFLINE)
mock_get.asset_called_once_with('url',
auth=auth.HTTPBasicAuth('username',
'password'))
# SSL Error
mock_get.side_effect = requests.exceptions.SSLError
self.assertEqual(redfish.pod_status('url', 'username', 'password'),
constants.PODM_STATUS_OFFLINE)
self.assertEqual(mock_get.call_count, 2)
# Timeout
mock_get.side_effect = requests.Timeout
self.assertEqual(redfish.pod_status('url', 'username', 'password'),
constants.PODM_STATUS_OFFLINE)
self.assertEqual(mock_get.call_count, 3)
def setUp(self):
def getLogger(name):
self.mock_logger = mock.Mock()
return self.mock_logger
sys.modules['logging'].getLogger = getLogger
def get(url, headers):
get_return = mock.Mock()
get_return.ok = True
get_return.json = mock.Mock()
get_return.json.return_value = {'data': {'status': 1}}
return get_return
sys.modules['requests'].get = get
self.env = EnvironmentVarGuard()
self.env.set('CACHET_TOKEN', 'token2')
self.configuration = Configuration('config.yml')
sys.modules['requests'].Timeout = Timeout
sys.modules['requests'].ConnectionError = ConnectionError
sys.modules['requests'].HTTPError = HTTPError
def _call(self, verb, url, data={}, params={}, headers={}):
if headers:
self.session.headers.update(headers)
log.info('Call %s with data %s', url, data)
resp = self.session.request(verb, url, json=data, params=params)
status_code = resp.status_code
try:
resp.raise_for_status()
except requests.HTTPError as exc:
log.debug('Error occured, endpoint : %s, apikey : %s',
url, self.apikey)
return resp, status_code
except requests.Timeout:
log.error('Request Timeout to %si', url)
return False, status_code
except requests.RequestException:
log.error('Requests Error')
return False, status_code
else:
return resp, status_code
def api_delete(server_name, api, session_id):
#Header and URL for delete call
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.delete(url, headers=headers, verify=False)
except requests.ConnectionError:
raise TintrRequestsiApiException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# PUT
def api_put(server_name, api, payload, session_id):
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.put(url, data=json.dumps(payload),
headers=headers, verify=False)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# POST
def api_post(server_name, api, payload, session_id):
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.post(url, data=json.dumps(payload),
headers=headers, verify=False)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# Login.
def download_file(server_name, report_url, session_id, file_name):
headers = {'content-type': 'application/json'}
try:
r = requests.get(report_url, headers=headers, verify=False, stream=True)
# if HTTP Response is not 200 then raise an exception
if r.status_code != 200:
message = "The HTTP response for get call to the server is not 200."
raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text)
with open(file_name, 'w') as file_h:
for block in r.iter_content(4096):
file_h.write(block)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except Exception as e:
raise TintriRequestsException("An unexpected error: " + e.__str__())
def _search_md(url='http://169.254.169.254/latest/meta-data/iam/'):
d = {}
try:
r = requests.get(url, timeout=.1)
if r.content:
fields = r.content.split('\n')
for field in fields:
if field.endswith('/'):
d[field[0:-1]] = get_iam_role(url + field)
else:
val = requests.get(url + field).content
if val[0] == '{':
val = json.loads(val)
else:
p = val.find('\n')
if p > 0:
val = r.content.split('\n')
d[field] = val
except (requests.Timeout, requests.ConnectionError):
pass
return d
def _search_md(url='http://169.254.169.254/latest/meta-data/iam/'):
d = {}
try:
r = requests.get(url, timeout=.1)
if r.content:
fields = r.content.split('\n')
for field in fields:
if field.endswith('/'):
d[field[0:-1]] = get_iam_role(url + field)
else:
val = requests.get(url + field).content
if val[0] == '{':
val = json.loads(val)
else:
p = val.find('\n')
if p > 0:
val = r.content.split('\n')
d[field] = val
except (requests.Timeout, requests.ConnectionError):
pass
return d
def state_destroy_image(self):
base_url = self.module.params['base_url']
uuid = self.module.params['uuid']
uri = "%s/image/%s" % (base_url, uuid)
try:
if not self.module.check_mode:
req = requests.delete(uri)
if req.status_code == 200:
self.module.exit_json(changed=True)
else:
self.module.fail_json(msg="Unknown Hanlon API error", apierror=req.text)
self.module.exit_json(changed=True)
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
self.module.exit_json(changed=False)
def state_destroy_policy(self):
base_url = self.module.params['base_url']
uuid = self.module.params['uuid']
uri = "%s/policy/%s" % (base_url, uuid)
try:
if not self.module.check_mode:
req = requests.delete(uri)
if req.status_code == 200:
self.module.exit_json(changed=True)
else:
self.module.fail_json(msg="Unknown error", apierror=req.text)
self.module.exit_json(changed=True)
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
def state_destroy_active_model(self):
uri = "%s/active_model/%s" % (self.base_url, self.uuid)
try:
if not self.module.check_mode:
req = requests.delete(uri)
if req.status_code == 200:
self.module.exit_json(changed=True)
else:
self.module.fail_json(msg="Unknown error", apierror=req.text)
self.module.exit_json(changed=True)
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
def check_model_state(self):
base_url = self.module.params['base_url']
model_name = self.module.params['label']
uri = "%s/model" % base_url
try:
json_result, http_success = hanlon_get_request(uri)
for response in json_result['response']:
uri = response['@uri']
model, http_success = hanlon_get_request(uri)
if http_success:
model_response = model['response']
if model_response['@label'] == model_name:
return 'present', model_response['@uuid']
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
return 'absent', None
def state_destroy_image(self):
base_url = self.module.params['base_url']
uuid = self.module.params['uuid']
uri = "%s/image/%s" % (base_url, uuid)
try:
if not self.module.check_mode:
req = requests.delete(uri)
if req.status_code == 200:
self.module.exit_json(changed=True)
else:
self.module.fail_json(msg="Unknown Hanlon API error", apierror=req.text)
self.module.exit_json(changed=True)
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
self.module.exit_json(changed=False)
def state_destroy_active_model(self):
uri = "%s/active_model/%s" % (self.base_url, self.uuid)
try:
if not self.module.check_mode:
req = requests.delete(uri)
if req.status_code == 200:
self.module.exit_json(changed=True)
else:
self.module.fail_json(msg="Unknown error", apierror=req.text)
self.module.exit_json(changed=True)
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
def check_model_state(self):
base_url = self.module.params['base_url']
model_name = self.module.params['label']
uri = "%s/model" % base_url
try:
json_result, http_success = hanlon_get_request(uri)
for response in json_result['response']:
uri = response['@uri']
model, http_success = hanlon_get_request(uri)
if http_success:
model_response = model['response']
if model_response['@label'] == model_name:
return 'present', model_response['@uuid']
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
return 'absent', None
def state_destroy_model(self):
base_url = self.module.params['base_url']
uuid = self.module.params['uuid']
uri = "%s/model/%s" % (base_url, uuid)
try:
req = requests.delete(uri)
if req.status_code == 200:
self.module.exit_json(changed=True)
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
self.module.exit_json(changed=False)
def state_destroy_image(self):
base_url = self.module.params['base_url']
uuid = self.module.params['uuid']
uri = "%s/image/%s" % (base_url, uuid)
try:
if not self.module.check_mode:
req = requests.delete(uri)
if req.status_code == 200:
self.module.exit_json(changed=True)
else:
self.module.fail_json(msg="Unknown Hanlon API error", apierror=req.text)
self.module.exit_json(changed=True)
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
self.module.exit_json(changed=False)
def state_destroy_policy(self):
base_url = self.module.params['base_url']
uuid = self.module.params['uuid']
uri = "%s/policy/%s" % (base_url, uuid)
try:
if not self.module.check_mode:
req = requests.delete(uri)
if req.status_code == 200:
self.module.exit_json(changed=True)
else:
self.module.fail_json(msg="Unknown error", apierror=req.text)
self.module.exit_json(changed=True)
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
def state_destroy_active_model(self):
uri = "%s/active_model/%s" % (self.base_url, self.uuid)
try:
if not self.module.check_mode:
req = requests.delete(uri)
if req.status_code == 200:
self.module.exit_json(changed=True)
else:
self.module.fail_json(msg="Unknown error", apierror=req.text)
self.module.exit_json(changed=True)
except requests.ConnectionError as connect_error:
self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
except requests.Timeout as timeout_error:
self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
except requests.RequestException as request_exception:
self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))