def api_put(server_name, api, payload, session_id):
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.put(url, data=json.dumps(payload),
headers=headers, verify=False)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# POST
python类ConnectionError()的实例源码
def api_post(server_name, api, payload, session_id):
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.post(url, data=json.dumps(payload),
headers=headers, verify=False)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# Login.
def download_file(server_name, report_url, session_id, file_name):
headers = {'content-type': 'application/json'}
try:
r = requests.get(report_url, headers=headers, verify=False, stream=True)
# if HTTP Response is not 200 then raise an exception
if r.status_code != 200:
message = "The HTTP response for get call to the server is not 200."
raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text)
with open(file_name, 'w') as file_h:
for block in r.iter_content(4096):
file_h.write(block)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except Exception as e:
raise TintriRequestsException("An unexpected error: " + e.__str__())
def index(self, uid='', iss=''):
link = ''
if iss:
link = iss
elif uid:
try:
link = self.rph.find_srv_discovery_url(
resource="acct:{}".format(uid))
except requests.ConnectionError:
raise cherrypy.HTTPError(
message="Webfinger lookup failed, connection error")
else:
fname = os.path.join(self.html_home, 'opbyuid.html')
return as_bytes(open(fname, 'r').read())
if link:
resp_headers = self.rph.begin(link)
raise cherrypy.HTTPRedirect(resp_headers['Location'])
def test_get_podm_status_Offline_by_http_exception(self, mock_get):
mock_get.side_effect = requests.ConnectionError
self.assertEqual(redfish.pod_status('url', 'username', 'password'),
constants.PODM_STATUS_OFFLINE)
mock_get.asset_called_once_with('url',
auth=auth.HTTPBasicAuth('username',
'password'))
# SSL Error
mock_get.side_effect = requests.exceptions.SSLError
self.assertEqual(redfish.pod_status('url', 'username', 'password'),
constants.PODM_STATUS_OFFLINE)
self.assertEqual(mock_get.call_count, 2)
# Timeout
mock_get.side_effect = requests.Timeout
self.assertEqual(redfish.pod_status('url', 'username', 'password'),
constants.PODM_STATUS_OFFLINE)
self.assertEqual(mock_get.call_count, 3)
api_testcase_with_test_reset.py 文件源码
项目:intel-manager-for-lustre
作者: intel-hpdd
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def supervisor_controlled_processes_running(self):
# Use the api to verify the processes controlled by supervisor are all in a RUNNING state
try:
response = self.chroma_manager.get('/api/system_status/')
except requests.ConnectionError:
logger.warning("Connection error trying to connect to the manager")
return False
self.assertEqual(response.successful, True, response.text)
system_status = response.json
non_running_processes = []
for process in system_status['supervisor']:
if not process['statename'] == 'RUNNING':
non_running_processes.append(process)
if non_running_processes:
logger.warning("Supervisor processes found not to be running: '%s'" % non_running_processes)
return False
else:
return True
def get(self,url):
try:
resp = requests.get(url=url,headers=headers.get_header())
self.notifylineEdit_4.emit(u"[+] ???? "+url)
return resp
except requests.ConnectionError:
# self.log.error(traceback.print_exc())
# print traceback.print_exc()
self.log.error(u'??{0}??'.format(url))
# ???????3?
time.sleep(3)
except Exception as e:
# self.log.error(traceback.print_exc())
print traceback.print_exc()
self.log.error(u'??{0}??????{1}'.format(url,e))
# return
# ??????
# http://movie.douban.com/subject/10533913/photos?type=R
# ?? http://img3.douban.com/view/photo/thumb/public/p1812524514.jpg
# ???? http://img3.douban.com/view/photo/photo/public/p1812524514.jpg
# ???????images,??????
def create_external_export(username, id_string, export_id, query=None,
token=None, meta=None):
export = Export.objects.get(id=export_id)
try:
# though export is not available when for has 0 submissions, we
# catch this since it potentially stops celery
gen_export = generate_external_export(
Export.EXTERNAL_EXPORT, username,
id_string, export_id, token, query, meta
)
except (Exception, NoRecordsFoundError, ConnectionError) as e:
export.internal_status = Export.FAILED
export.save()
# mail admins
details = {
'export_id': export_id,
'username': username,
'id_string': id_string
}
report_exception("External Export Exception: Export ID - "
"%(export_id)s, /%(username)s/%(id_string)s"
% details, e, sys.exc_info())
raise
else:
return gen_export.id
def setUp(self):
def getLogger(name):
self.mock_logger = mock.Mock()
return self.mock_logger
sys.modules['logging'].getLogger = getLogger
def get(url, headers):
get_return = mock.Mock()
get_return.ok = True
get_return.json = mock.Mock()
get_return.json.return_value = {'data': {'status': 1}}
return get_return
sys.modules['requests'].get = get
self.env = EnvironmentVarGuard()
self.env.set('CACHET_TOKEN', 'token2')
self.configuration = Configuration('config.yml')
sys.modules['requests'].Timeout = Timeout
sys.modules['requests'].ConnectionError = ConnectionError
sys.modules['requests'].HTTPError = HTTPError
def url_to_img_array(url):
if not isinstance(url, basestring):
logging.warning("input is neither an ndarray nor a string, so I don't know what to do")
return None
# replace_https_with_http:
if 'http' in url and 'https' not in url:
url = url.replace("https", "http")
try:
headers = {'User-Agent': USER_AGENT}
response = requests.get(url, headers=headers)
img_array = cv2.imdecode(np.asarray(bytearray(response.content)), 1)
except requests.ConnectionError:
logging.warning("connection error - check url or connection")
return None
except:
logging.warning(" error other than connection error - check something other than connection")
return None
return img_array
def post_event(url, tag_name, event_message):
headers = {"Content-Type": "application/json"}
try:
r = requests.post(url,
headers=headers,
data='{{"what":"Ceph Health",'
'"tags":"{}",'
'"data":"{}"}}'.format(tag_name,
event_message))
except requests.ConnectionError:
# if we hit this, the endpoint wasn't there (graphite web was not
# accessible) so identify that issue as a server error (500)
return 500
else:
return r.status_code
def post_event(url, tag_name, event_message):
headers = {"Content-Type": "application/json"}
try:
r = requests.post(url,
headers=headers,
data='{{"what":"Ceph Health",'
'"tags":"{}",'
'"data":"{}"}}'.format(tag_name,
event_message))
except requests.ConnectionError:
# if we hit this, the endpoint wasn't there (graphite web was not
# accessible) so identify that issue as a server error (500)
return 500
else:
return r.status_code
def post(self, printer_id):
"""Send file from on printer to defined printers or prints given file"""
args = deleteParser.parse_args()
printer = g.user.get_printer_id(printer_id)
if not printer:
return "", 403
if args["send"]: # send file from one printer to defined printers
printer_ids = sendParser.parse_args()
printers = g.user.get_accessible_printers_id(printer_ids["printerId"])
content = OctoprintService.get_file_contents(printer, args["origin"], args["name"])
for dest_printer in printers:
try:
OctoprintService.send_file(dest_printer, args["name"], content, False)
except (RuntimeError, requests.ConnectionError):
pass
return "", 200
else: # print file
if OctoprintService.print(printer, args["origin"], args["name"]):
return "", 200
return "", 409
def test_run_socket_error(self, m_post):
m_post.side_effect = requests.ConnectionError
result = self._test_run('DEL', 'delNetwork', m_post)
self.assertEqual(1, result)
def get_wait_time_list():
for _ in xrange(3):
try:
token = get_token()
response = get_wait_time(token)
except (requests.HTTPError, requests.ConnectionError):
time.sleep(1)
else:
return response
def http_request(self, method, url, **kwargs):
method = method.upper()
verify_ssl = kwargs.pop('verify', None) or self.ssl_verify
proxies = kwargs.pop('proxies', None) or self.proxies
new_headers = kwargs.pop('headers', None)
if new_headers:
headers = self.token_header.copy()
headers.update(new_headers)
else:
headers = self.token_header
uri = self.server + url
try:
raw_data = kwargs.get("data", None)
if raw_data:
log.debug("Sending HTTP {0} {1} with {2}".format(method, url, raw_data))
r = self.session.request(method, uri, headers=headers, verify=verify_ssl, proxies=proxies,
timeout=self._timeout, **kwargs)
log.debug('HTTP {0:s} {1:s} took {2:.3f}s (response {3:d})'.format(method, url,
calculate_elapsed_time(r.elapsed),
r.status_code))
except requests.Timeout as timeout_error:
raise TimeoutError(uri=uri, original_exception=timeout_error)
except requests.ConnectionError as connection_error:
raise ApiError("Received a network connection error from {0:s}: {1:s}".format(self.server,
str(connection_error)),
original_exception=connection_error)
except Exception as e:
raise ApiError("Unknown exception when connecting to server: {0:s}".format(str(e)),
original_exception=e)
else:
if r.status_code == 404:
raise ObjectNotFoundError(uri=uri, message=r.text)
elif r.status_code == 401:
raise UnauthorizedError(uri=uri, action=method, message=r.text)
elif r.status_code >= 400:
raise ServerError(error_code=r.status_code, message=r.text)
return r
cluster_auto_start_daemon.py 文件源码
项目:sm-engine-ansible
作者: METASPACE2020
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def _send_rest_request(self, address):
try:
resp = requests.get(address)
except ConnectionError as e:
self.logger.debug('{} - {}'.format(address, e))
return False
except Exception as e:
self.logger.warning('{} - {}'.format(address, e))
return False
else:
self.logger.debug(resp)
return resp.ok
def _make_request(self, url, protocol='https'):
try:
return self.session.get('{}://{}'.format(protocol, url), timeout=5, verify=False)
except requests.Timeout:
return False
except requests.ConnectionError as e:
logging.debug('Connection Error: {}'.format(e))
return False
def login(self):
""" Log in to the ICE Portal (wifionice) """
logging.info('Trying to log in...')
try:
ret = self.session.post('http://{}/de/?login'.format(self.api_host_ip),
data={'login': True, 'CSRFToken': self.csrf_token})
except requests.exceptions.ConnectionError:
logging.debug('Login Failed, probably bad wifi')
def __log(self, msg, level):
payload = {'app_name': str(self.app_name), 'log_level': level, 'message': msg}
try:
self.session.post(url=self.elastic_url, data=json.dumps(payload),
headers=self.headers, auth=self.auth)
except ConnectionError as ce:
logging.error(ce.message)
logging.exception('Unable to connect to Elastic Search. Check the `elastic_url` and `auth`')
logging.error(msg)