def _handles_connection_error(func):
"""
Decorator for handling ConnectionErrors raised by the requests
library, raises a BrainServerError instead.
:param func: the function being decorated
"""
@functools.wraps(func)
def _handler(self, url, *args, **kwargs):
try:
return func(self, url, *args, **kwargs)
except requests.exceptions.ConnectionError as e:
message = "Unable to connect to domain: {}".format(url)
raise BrainServerError(message)
return _handler
python类exceptions()的实例源码
def _post(self, url, data=None):
"""
Issues a POST request.
:param url: The URL being posted to.
:param data: Any additional data to bundle with the POST, as a
dictionary. Defaults to None.
"""
log.debug('POST to %s with data %s...', url, str(data))
response = requests.post(url=url,
headers={'Authorization': self._access_key},
json=data,
allow_redirects=False)
try:
response.raise_for_status()
self._raise_on_redirect(response)
log.debug('POST %s results:\n%s', url, response.text)
return _dict(response)
except requests.exceptions.HTTPError as e:
_handle_and_raise(response, e)
def _post_raw_data(self, url, data=None, headers=None):
"""
Issues a POST request without encoding data argument.
:param url: The URL being posted to.
:param data: Any additional data to bundle with the POST, as raw data
to be used as the body.
"""
log.debug('POST raw data to %s ...', url)
headers_out = {'Authorization': self._access_key}
if headers:
headers_out.update(headers)
response = requests.post(url=url,
headers=headers_out,
data=data,
allow_redirects=False)
try:
response.raise_for_status()
self._raise_on_redirect(response)
log.debug('POST %s results:\n%s', url, response.text)
return _dict(response)
except requests.exceptions.HTTPError as e:
_handle_and_raise(response, e)
def _put_raw_data(self, url, data=None, headers=None):
"""
Issues a POST request without encoding data argument.
:param url: The URL being posted to.
:param data: Any additional data to bundle with the POST, as raw data
to be used as the body.
"""
log.debug('PUT raw data to %s ...', url)
headers_out = {'Authorization': self._access_key}
if headers:
headers_out.update(headers)
response = requests.put(url=url,
headers=headers_out,
data=data,
allow_redirects=False)
try:
response.raise_for_status()
self._raise_on_redirect(response)
log.debug('PUT %s results:\n%s', url, response.text)
return _dict(response)
except requests.exceptions.HTTPError as e:
_handle_and_raise(response, e)
def _put(self, url, data=None):
"""
Issues a PUT request.
:param url: The URL being PUT to.
:param data: Any additional data to bundle with the POST, as a
dictionary. Defaults to None.
"""
log.debug('PUT to %s with data %s...', url, str(data))
response = requests.put(url=url,
headers={'Authorization': self._access_key},
json=data,
allow_redirects=False)
try:
response.raise_for_status()
self._raise_on_redirect(response)
log.debug('PUT %s results:\n%s', url, response.text)
return _dict(response)
except requests.exceptions.HTTPError as e:
_handle_and_raise(response, e)
def _delete(self, url):
"""
Issues a DELETE request.
:param url: The URL to DELETE.
"""
log.debug('DELETE %s...', url)
response = requests.delete(url=url,
headers={'Authorization': self._access_key},
allow_redirects=False)
try:
response.raise_for_status()
self._raise_on_redirect(response)
log.debug('DELETE %s results:\n%s', url, response.text)
return _dict(response)
except requests.exceptions.HTTPError as e:
_handle_and_raise(response, e)
def directions(self):
"""
Returns list with translate directions
>>> translate = YandexTranslate("trnsl.1.1.20130421T140201Z.323e508a33e9d84b.f1e0d9ca9bcd0a00b0ef71d82e6cf4158183d09e")
>>> directions = translate.directions
>>> len(directions) > 0
True
"""
try:
response = requests.get(self.url("langs"), params={"key": self.api_key})
except requests.exceptions.ConnectionError:
raise YandexTranslateException(self.error_codes[503])
else:
response = response.json()
status_code = response.get("code", 200)
if status_code != 200:
raise YandexTranslateException(status_code)
return response.get("dirs")
def wait_for_opsman_ready(inst, timeout):
addr = get_addr(inst)
def should_wait():
try:
resp = requests.head(
"https://{}/".format(addr),
verify=False, timeout=1)
return resp.status_code >= 400
except requests.exceptions.RequestException as ex:
pass
except requests.HTTPError as ex:
print ex
return True
waitFor = wait_util.wait_while(should_wait)
waitFor(timeout)
#
# Main deploy driver function
#
def validate_creds(opts):
session = Session(profile_name=opts.get('profile_name'),
region_name=opts['region'])
ec2 = session.resource("ec2")
try:
ec2.meta.client.describe_id_format()
except botocore.exceptions.NoCredentialsError as ex:
print "Missing ~/.aws/credentials ? missing profile_name from cfg file"
print "http://boto3.readthedocs.org/en/latest/guide/configuration.html"
print ex
return False
try:
pivnet.Pivnet(token=opts['PIVNET_TOKEN'])
except pivnet.AuthException as ex:
print "Get API TOKEN from "
print "https://network.pivotal.io/users/dashboard/edit-profile"
print ex
return False
return True
def enroll(self, enrollment_id, enrollment_secret, csr):
"""Enroll a registered user in order to receive a signed X509 certificate
Args:
enrollment_id (str): The registered ID to use for enrollment
enrollment_secret (str): The secret associated with the
enrollment ID
csr (bytes or file-like object): PEM-encoded PKCS#10 certificate
signing request
Returns: PEM-encoded X509 certificate
Raises:
RequestException: errors in requests.exceptions
ValueError: Failed response, json parse error, args missing
"""
if not enrollment_id or not enrollment_secret or not csr:
raise ValueError("Missing required parameters. "
"'enrollmentID', 'enrollmentSecret' and 'csr'"
" are all required.")
if self._ca_name != "":
req = {
"caName": self._ca_name,
"certificate_request": csr
}
else:
req = {"certificate_request": csr}
response = self._send_ca_post(path="enroll", json=req,
auth=(enrollment_id, enrollment_secret),
verify=self._ca_certs_path)
_logger.debug("Raw response json {0}".format(response))
if response['success']:
return base64.b64decode(response['result']['Cert'])
else:
raise ValueError("Enrollment failed with errors {0}"
.format(response['errors']))
def send(self, data):
'''Send the given data to Stitch, retrying on exceptions'''
url = self.stitch_url
headers = self.headers()
response = self.session.post(url, headers=headers, data=data)
response.raise_for_status()
return response
def request_ovh_client(consumer_key=None):
try:
import requests
import requests.exceptions
except ImportError:
print('The requests library is needed to perform this command:')
print(' pip install requests')
print(' apt-get install python3-requests')
exit(1)
client = Client(
endpoint='ovh-eu',
application_key='yVPsINnSdHOLTGHf',
application_secret='5mVDkHaJw6rp5GYiYBUlgDv1vruRejkl',
consumer_key=consumer_key
)
if not consumer_key:
response = client.request_consumerkey(
access_rules=[
{'method': 'GET', 'path': '/paas/monitoring'},
{'method': 'GET', 'path': '/paas/monitoring/*'},
{'method': 'POST', 'path': '/paas/monitoring/*'},
{'method': 'PUT', 'path': '/paas/monitoring/*'},
{'method': 'DELETE', 'path': '/paas/monitoring/*'}
]
)
response = response.json()
print('New consumer key:', response['consumerKey'])
print('Validate it at', response['validationUrl'])
input('When you are ready, press Enter')
try:
client.get('/paas/monitoring')
except requests.exceptions.HTTPError as e:
if not e.response.status_code == 403:
raise
return request_ovh_client()
return client
def test_requests_error_passthrough(self, mock_requests):
mock_requests.exceptions = requests.exceptions
mock_requests.request.side_effect = requests.exceptions.RequestException
# pylint: disable=protected-access
self.assertRaises(requests.exceptions.RequestException,
self.net._send_request, 'GET', 'uri')
def test_head_get_post_error_passthrough(self):
self.send_request.side_effect = requests.exceptions.RequestException
for method in self.net.head, self.net.get:
self.assertRaises(
requests.exceptions.RequestException, method, 'GET', 'uri')
self.assertRaises(requests.exceptions.RequestException,
self.net.post, 'uri', obj=self.obj)
def _do_request(self, payload):
payload = payload.build()
LOG.debug('Sending request to %(endpoint)s: %(payload)s',
{'endpoint': self.endpoint, 'payload': payload})
try:
resp = requests.post(
self.endpoint,
auth=requests.auth.HTTPBasicAuth(self.username, self.password),
data=payload,
# TODO(ifarkas): enable cert verification
verify=False)
except Exception as e:
# This is a hack for handling 'No route to host' ConnectionError,
# so that the Traceback would not be shown
if e.__class__ == 'requests.exceptions.ConnectionError':
LOG.exception('Request failed (ConnectionError)')
raise exceptions.WSManRequestFailure()
if e.__class__ == 'requests.exceptions.RequestException':
LOG.exception('Request failed')
raise exceptions.WSManRequestFailure()
else:
raise
LOG.debug('Received response from %(endpoint)s: %(payload)s',
{'endpoint': self.endpoint, 'payload': resp.content})
if not resp.ok:
raise exceptions.WSManInvalidResponse(
status_code=resp.status_code,
reason=resp.reason)
else:
return resp
def invoke(self, resource_uri, method, selectors=None, properties=None,
expected_return_value=None):
"""Invokes a remote WS-Man method
:param resource_uri: URI of the resource
:param method: name of the method to invoke
:param selectors: dictionary of selectors
:param properties: dictionary of properties
:param expected_return_value: expected return value reported back by
the DRAC card. For return value codes check the profile
documentation of the resource used in the method call. If not set,
return value checking is skipped.
:returns: an lxml.etree.Element object of the response received
:raises: WSManRequestFailure on request failures
:raises: WSManInvalidResponse when receiving invalid response
:raises: DRACOperationFailed on error reported back by the DRAC
interface
:raises: DRACUnexpectedReturnValue on return value mismatch
"""
if selectors is None:
selectors = {}
if properties is None:
properties = {}
resp = super(WSManClient, self).invoke(resource_uri, method, selectors,
properties)
return_value = utils.find_xml(resp, 'ReturnValue', resource_uri).text
if return_value == utils.RET_ERROR:
message_elems = utils.find_xml(resp, 'Message', resource_uri, True)
messages = [message_elem.text for message_elem in message_elems]
raise exceptions.DRACOperationFailed(drac_messages=messages)
if (expected_return_value is not None and
return_value != expected_return_value):
raise exceptions.DRACUnexpectedReturnValue(
expected_return_value=expected_return_value,
actual_return_value=return_value)
return resp
def send_token_to_subclient(webservice_endpoint: str, user_id: str,
subclient_token: str, subclient_id: str) -> str:
"""Sends the subclient-specific token to the subclient.
The subclient verifies this token with BlenderID. If it's accepted, the
subclient ensures there is a valid user created server-side. The ID of
that user is returned.
:returns: the user ID at the subclient.
"""
import requests
import urllib.parse
url = urllib.parse.urljoin(webservice_endpoint, 'blender_id/store_scst')
try:
r = requests.post(url,
data={'user_id': user_id,
'subclient_id': subclient_id,
'token': subclient_token},
verify=True)
r.raise_for_status()
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as e:
raise BlenderIdCommError(str(e))
resp = r.json()
if resp['status'] != 'success':
raise BlenderIdCommError('Error sending subclient-specific token to %s, error is: %s'
% (webservice_endpoint, resp))
return resp['subclient_user_id']
def shorten_url(url, botan_token, user_id):
try:
return requests.get(SHORTENER_URL, params={
'token': botan_token,
'url': url,
'user_ids': str(user_id),
}).text
except requests.exceptions:
return url
def get_stack(self, stack_name, debug=True):
try:
stack = self.cf_client.describe_stacks(
StackName=stack_name
)
return stack['Stacks'][0]
except botocore.exceptions.ClientError as cf_error:
if debug:
self.logger.error(cf_error)
except Exception as error:
if debug:
self.logger.error(error)
return None