def test_cause(self):
def inner_raise():
try:
self.zero_div()
except ZeroDivisionError as e:
raise_from(KeyError, e)
def outer_raise():
inner_raise() # Marker
blocks = boundaries.split(self.get_report(outer_raise))
self.assertEqual(len(blocks), 3)
self.assertEqual(blocks[1], cause_message)
self.check_zero_div(blocks[0])
self.assertIn('inner_raise() # Marker', blocks[2])
python类raise_from()的实例源码
def _find_spec_from_path(name, path=None):
"""Return the spec for the specified module.
First, sys.modules is checked to see if the module was already imported. If
so, then sys.modules[name].__spec__ is returned. If that happens to be
set to None, then ValueError is raised. If the module is not in
sys.modules, then sys.meta_path is searched for a suitable spec with the
value of 'path' given to the finders. None is returned if no spec could
be found.
Dotted names do not have their parent packages implicitly imported. You will
most likely need to explicitly import all parent packages in the proper
order for a submodule to get the correct spec.
"""
if name not in sys.modules:
return _find_spec(name, path)
else:
module = sys.modules[name]
if module is None:
return None
try:
spec = module.__spec__
except AttributeError:
six.raise_from(ValueError('{}.__spec__ is not set'.format(name)), None)
else:
if spec is None:
raise ValueError('{}.__spec__ is None'.format(name))
return spec
def find_spec(name, package=None):
"""Return the spec for the specified module.
First, sys.modules is checked to see if the module was already imported. If
so, then sys.modules[name].__spec__ is returned. If that happens to be
set to None, then ValueError is raised. If the module is not in
sys.modules, then sys.meta_path is searched for a suitable spec with the
value of 'path' given to the finders. None is returned if no spec could
be found.
If the name is for submodule (contains a dot), the parent module is
automatically imported.
The name and package arguments work the same as importlib.import_module().
In other words, relative module names (with leading dots) work.
"""
fullname = resolve_name(name, package) if name.startswith('.') else name
if fullname not in sys.modules:
parent_name = fullname.rpartition('.')[0]
if parent_name:
# Use builtins.__import__() in case someone replaced it.
parent = __import__(parent_name, fromlist=['__path__'])
return _find_spec(fullname, parent.__path__)
else:
return _find_spec(fullname, None)
else:
module = sys.modules[fullname]
if module is None:
return None
try:
spec = module.__spec__
except AttributeError:
six.raise_from(ValueError('{}.__spec__ is not set'.format(name)), None)
else:
if spec is None:
raise ValueError('{}.__spec__ is None'.format(name))
return spec
def find_loader(name, path=None):
"""Return the loader for the specified module.
This is a backward-compatible wrapper around find_spec().
This function is deprecated in favor of importlib.util.find_spec().
"""
warnings.warn('Use importlib.util.find_spec() instead.',
DeprecationWarning, stacklevel=2)
try:
loader = sys.modules[name].__loader__
if loader is None:
raise ValueError('{}.__loader__ is None'.format(name))
else:
return loader
except KeyError:
pass
except AttributeError:
six.raise_from(ValueError('{}.__loader__ is not set'.format(name)), None)
spec = _find_spec(name, path)
# We won't worry about malformed specs (missing attributes).
if spec is None:
return None
if spec.loader is None:
if spec.submodule_search_locations is None:
raise _ImportError('spec for {} missing loader'.format(name),
name=name)
raise _ImportError('namespace packages do not have loaders',
name=name)
return spec.loader
# This should be in all python >2.7
def raise_with_cause(exc_cls, message, *args, **kwargs):
"""Helper to raise + chain exceptions (when able) and associate a *cause*.
NOTE(harlowja): Since in py3.x exceptions can be chained (due to
:pep:`3134`) we should try to raise the desired exception with the given
*cause* (or extract a *cause* from the current stack if able) so that the
exception formats nicely in old and new versions of python. Since py2.x
does **not** support exception chaining (or formatting) the exception
class provided should take a ``cause`` keyword argument (which it may
discard if it wants) to its constructor which can then be
inspected/retained on py2.x to get *similar* information as would be
automatically included/obtainable in py3.x.
:param exc_cls: the exception class to raise (typically one derived
from :py:class:`.CausedByException` or equivalent).
:param message: the text/str message that will be passed to
the exceptions constructor as its first positional
argument.
:param args: any additional positional arguments to pass to the
exceptions constructor.
:param kwargs: any additional keyword arguments to pass to the
exceptions constructor.
.. versionadded:: 1.6
"""
if 'cause' not in kwargs:
exc_type, exc, exc_tb = sys.exc_info()
try:
if exc is not None:
kwargs['cause'] = exc
finally:
# Leave no references around (especially with regards to
# tracebacks and any variables that it retains internally).
del(exc_type, exc, exc_tb)
six.raise_from(exc_cls(message, *args, **kwargs), kwargs.get('cause'))
def convert_version_to_int(version):
"""Convert a version to an integer.
*version* must be a string with dots or a tuple of integers.
.. versionadded:: 2.0
"""
try:
if isinstance(version, six.string_types):
version = convert_version_to_tuple(version)
if isinstance(version, tuple):
return six.moves.reduce(lambda x, y: (x * 1000) + y, version)
except Exception as ex:
msg = _("Version %s is invalid.") % version
six.raise_from(ValueError(msg), ex)
def assert_has_calls(self, calls, any_order=False):
"""assert the mock has been called with the specified calls.
The `mock_calls` list is checked for the calls.
If `any_order` is False (the default) then the calls must be
sequential. There can be extra calls before or after the
specified calls.
If `any_order` is True then the calls can be in any order, but
they must all appear in `mock_calls`."""
expected = [self._call_matcher(c) for c in calls]
cause = expected if isinstance(expected, Exception) else None
all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
if not any_order:
if expected not in all_calls:
six.raise_from(AssertionError(
'Calls not found.\nExpected: %r\n'
'Actual: %r' % (_CallList(calls), self.mock_calls)
), cause)
return
all_calls = list(all_calls)
not_found = []
for kall in expected:
try:
all_calls.remove(kall)
except ValueError:
not_found.append(kall)
if not_found:
six.raise_from(AssertionError(
'%r not all found in call list' % (tuple(not_found),)
), cause)
def assert_any_call(self, *args, **kwargs):
"""assert the mock has been called with the specified arguments.
The assert passes if the mock has *ever* been called, unlike
`assert_called_with` and `assert_called_once_with` that only pass if
the call is the most recent one."""
expected = self._call_matcher((args, kwargs))
actual = [self._call_matcher(c) for c in self.call_args_list]
if expected not in actual:
cause = expected if isinstance(expected, Exception) else None
expected_string = self._format_mock_call_signature(args, kwargs)
six.raise_from(AssertionError(
'%s call not found' % expected_string
), cause)
def _apirequest(self, method, url, *args, **kwargs):
"""
General request-response processing routine for dpr-api server.
:return:
Response -- requests.Response instance
"""
# TODO: doing this for every request is kinda awkward
self._ensure_config()
if not url.startswith('http'):
# Relative url is given. Build absolute server url
url = self.server + url
methods = {
'POST': requests.post,
'PUT': requests.put,
'GET': requests.get,
'DELETE': requests.delete
}
headers = kwargs.pop('headers', {})
if self.token:
headers.setdefault('Auth-Token', '%s' % self.token)
response = methods.get(method)(url, *args, headers=headers, **kwargs)
try:
jsonresponse = response.json()
except Exception as e:
six.raise_from(
JSONDecodeError(response, message='Failed to decode JSON response from server'), e)
if response.status_code not in (200, 201):
raise HTTPStatusError(response, message='Error %s\n%s' % (
response.status_code,
jsonresponse.get('message') or jsonresponse.get('description')))
return response
def raise_from(cls, exc, path=None):
obj = cls(str(exc), path)
if six.PY3:
six.raise_from(obj, exc)
else:
_, _, tb = sys.exc_info()
six.reraise(cls, obj, tb)
def validate(self, obj):
if self.key in obj:
try:
self.run_filters(obj)
except ValueError as exc:
FieldError.raise_from(exc, self.key)
except FieldError as exc:
exc.add_path_info(self.key)
raise
else:
if not self.optional:
raise FieldError('Is required', self.key)
def instantiate(self, value):
if self.is_json_object:
return self.type.fromjson(value)
try:
return self.type(value)
except ValueError as exc:
FieldError.raise_from(exc)
def call(func, args):
"""Call the function with args normalized and cast to the correct types.
Args:
func: The function to call.
args: The arguments parsed by docopt.
Returns:
The return value of func.
"""
assert hasattr(func, '__call__'), 'Cannot call func: {}'.format(
func.__name__)
raw_func = (
func if isinstance(func, FunctionType) else func.__class__.__call__)
hints = collections.defaultdict(lambda: Any, get_type_hints(raw_func))
argspec = _getargspec(raw_func)
named_args = {}
varargs = ()
for k, nk, v in _normalize(args):
if nk == argspec.varargs:
hints[nk] = Tuple[hints[nk], ...]
elif nk not in argspec.args and argspec.varkw in hints:
hints[nk] = hints[argspec.varkw]
try:
value = cast(hints[nk], v)
except TypeError as e:
_LOGGER.exception(e)
six.raise_from(exc.InvalidCliValueError(k, v), e)
if nk == argspec.varargs:
varargs = value
elif (nk in argspec.args or argspec.varkw) and (
nk not in named_args or named_args[nk] is None):
named_args[nk] = value
return func(*varargs, **named_args)
def _raise_wrapped_exception(exc):
code = exc.error_code
msg = '%s (cdb2api rc %d)' % (exc.error_message, code)
if "null constraint violation" in msg:
six.raise_from(NonNullConstraintError(msg), exc) # DRQS 86013831
six.raise_from(_EXCEPTION_BY_RC.get(code, OperationalError)(msg), exc)
def _propagate_swift_exceptions(func):
"""Bubbles all swift exceptions as `SwiftError` classes
"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except (swift_service.SwiftError,
swift_exceptions.ClientException) as e:
six.raise_from(_swiftclient_error_to_descriptive_exception(e), e)
return wrapper
def _s3_client_call(self, method_name, *args, **kwargs):
"""
Creates a boto3 S3 ``Client`` object and runs ``method_name``.
"""
s3_client = _get_s3_client()
method = getattr(s3_client, method_name)
try:
return method(*args, **kwargs)
except botocore_exceptions.ClientError as e:
six.raise_from(_parse_s3_error(e, **kwargs), e)
def _make_s3_transfer(self, method_name, config=None, *args, **kwargs):
"""
Creates a boto3 ``S3Transfer`` object for doing multipart uploads
and downloads and executes the given method.
"""
transfer = _get_s3_transfer(config=config)
method = getattr(transfer, method_name)
try:
return method(*args, **kwargs)
except boto3_exceptions.S3UploadFailedError as e:
six.raise_from(exceptions.FailedUploadError(str(e), e), e)
except boto3_exceptions.RetriesExceededError as e:
six.raise_from(exceptions.FailedDownloadError(str(e), e), e)
def _write_request(self, request_url, json_message, api_key=None):
data = {
'data': base64.b64encode(json_message.encode('utf8')),
'verbose': 1,
'ip': 0,
}
if api_key:
data.update({'api_key': api_key})
encoded_data = urllib.parse.urlencode(data).encode('utf8')
try:
request = urllib.request.Request(request_url, encoded_data)
# Note: We don't send timeout=None here, because the timeout in urllib2 defaults to
# an internal socket timeout, not None.
if self._request_timeout is not None:
response = urllib.request.urlopen(request, timeout=self._request_timeout).read()
else:
response = urllib.request.urlopen(request).read()
except urllib.error.URLError as e:
raise six.raise_from(MixpanelException(e), e)
try:
response = json.loads(response.decode('utf8'))
except ValueError:
raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response))
if response['status'] != 1:
raise MixpanelException('Mixpanel error: {0}'.format(response['error']))
return True
def _flush_endpoint(self, endpoint, api_key=None):
buf = self._buffers[endpoint]
while buf:
batch = buf[:self._max_size]
batch_json = '[{0}]'.format(','.join(batch))
try:
self._consumer.send(endpoint, batch_json, api_key)
except MixpanelException as orig_e:
mp_e = MixpanelException(orig_e)
mp_e.message = batch_json
mp_e.endpoint = endpoint
raise six.raise_from(mp_e, orig_e)
buf = buf[self._max_size:]
self._buffers[endpoint] = buf
def _wrap_http_exceptions():
"""Reraise osc-lib exceptions with detailed messages."""
try:
yield
except ks_exceptions.HttpError as exc:
detail = json.loads(exc.response.content)['errors'][0]['detail']
msg = detail.split('\n')[-1].strip()
exc_class = _http_error_to_exc.get(exc.http_status,
exceptions.CommandError)
six.raise_from(exc_class(exc.http_status, msg), exc)