def assert_called_with(_mock_self, *args, **kwargs):
"""assert that the mock was called with the specified arguments.
Raises an AssertionError if the args and keyword args passed in are
different to the last call to the mock."""
self = _mock_self
if self.call_args is None:
expected = self._format_mock_call_signature(args, kwargs)
raise AssertionError('Expected call: %s\nNot called' % (expected,))
def _error_message(cause):
msg = self._format_mock_failure_message(args, kwargs)
if six.PY2 and cause is not None:
# Tack on some diagnostics for Python without __cause__
msg = '%s\n%s' % (msg, str(cause))
return msg
expected = self._call_matcher((args, kwargs))
actual = self._call_matcher(self.call_args)
if expected != actual:
cause = expected if isinstance(expected, Exception) else None
six.raise_from(AssertionError(_error_message(cause)), cause)
python类raise_from()的实例源码
def test_context_suppression(self):
try:
try:
raise Exception
except:
raise_from(ZeroDivisionError, None)
except ZeroDivisionError as _:
e = _
tb = sys.exc_info()[2]
lines = self.get_report(e, tb)
self.assertThat(lines, DocTestMatches("""\
Traceback (most recent call last):
File "...traceback2/tests/test_traceback.py", line ..., in test_context_suppression
raise_from(ZeroDivisionError, None)
File "<string>", line 2, in raise_from
ZeroDivisionError
""", doctest.ELLIPSIS))
def test_cause_and_context(self):
# When both a cause and a context are set, only the cause should be
# displayed and the context should be muted.
def inner_raise():
try:
self.zero_div()
except ZeroDivisionError as _e:
e = _e
try:
xyzzy
except NameError:
raise_from(KeyError, e)
def outer_raise():
inner_raise() # Marker
blocks = boundaries.split(self.get_report(outer_raise))
self.assertEqual(len(blocks), 3)
self.assertEqual(blocks[1], cause_message)
self.check_zero_div(blocks[0])
self.assertIn('inner_raise() # Marker', blocks[2])
def test_cause_recursive(self):
def inner_raise():
try:
try:
self.zero_div()
except ZeroDivisionError as e:
z = e
raise_from(KeyError, e)
except KeyError as e:
raise_from(z, e)
def outer_raise():
inner_raise() # Marker
blocks = boundaries.split(self.get_report(outer_raise))
self.assertEqual(len(blocks), 3)
self.assertEqual(blocks[1], cause_message)
# The first block is the KeyError raised from the ZeroDivisionError
self.assertIn('raise_from(KeyError, e)', blocks[0])
self.assertNotIn('1/0', blocks[0])
# The second block (apart from the boundary) is the ZeroDivisionError
# re-raised from the KeyError
self.assertIn('inner_raise() # Marker', blocks[2])
self.check_zero_div(blocks[2])
def test_context_suppression(self):
try:
try:
raise Exception
except:
raise_from(ZeroDivisionError, None)
except ZeroDivisionError as _:
e = _
tb = sys.exc_info()[2]
lines = self.get_report(e, tb)
self.assertThat(lines, DocTestMatches("""\
Traceback (most recent call last):
File "...traceback2/tests/test_traceback.py", line ..., in test_context_suppression
raise_from(ZeroDivisionError, None)
File "<string>", line 2, in raise_from
ZeroDivisionError
""", doctest.ELLIPSIS))
def test_cause_and_context(self):
# When both a cause and a context are set, only the cause should be
# displayed and the context should be muted.
def inner_raise():
try:
self.zero_div()
except ZeroDivisionError as _e:
e = _e
try:
xyzzy
except NameError:
raise_from(KeyError, e)
def outer_raise():
inner_raise() # Marker
blocks = boundaries.split(self.get_report(outer_raise))
self.assertEqual(len(blocks), 3)
self.assertEqual(blocks[1], cause_message)
self.check_zero_div(blocks[0])
self.assertIn('inner_raise() # Marker', blocks[2])
def test_cause_recursive(self):
def inner_raise():
try:
try:
self.zero_div()
except ZeroDivisionError as e:
z = e
raise_from(KeyError, e)
except KeyError as e:
raise_from(z, e)
def outer_raise():
inner_raise() # Marker
blocks = boundaries.split(self.get_report(outer_raise))
self.assertEqual(len(blocks), 3)
self.assertEqual(blocks[1], cause_message)
# The first block is the KeyError raised from the ZeroDivisionError
self.assertIn('raise_from(KeyError, e)', blocks[0])
self.assertNotIn('1/0', blocks[0])
# The second block (apart from the boundary) is the ZeroDivisionError
# re-raised from the KeyError
self.assertIn('inner_raise() # Marker', blocks[2])
self.check_zero_div(blocks[2])
def assert_called_with(_mock_self, *args, **kwargs):
"""assert that the mock was called with the specified arguments.
Raises an AssertionError if the args and keyword args passed in are
different to the last call to the mock."""
self = _mock_self
if self.call_args is None:
expected = self._format_mock_call_signature(args, kwargs)
raise AssertionError('Expected call: %s\nNot called' % (expected,))
def _error_message(cause):
msg = self._format_mock_failure_message(args, kwargs)
if six.PY2 and cause is not None:
# Tack on some diagnostics for Python without __cause__
msg = '%s\n%s' % (msg, str(cause))
return msg
expected = self._call_matcher((args, kwargs))
actual = self._call_matcher(self.call_args)
if expected != actual:
cause = expected if isinstance(expected, Exception) else None
six.raise_from(AssertionError(_error_message(cause)), cause)
def import_figura_file(path):
"""
Import a figura config file (with no side affects).
:param path: a python import path
:return: a python module object
:raise ConfigParsingError: if importing fails
"""
try:
return _import_module_no_side_effects(path)
except Exception as e:
if six.PY2:
# no exception chaining in python2
#raise ConfigParsingError('Failed parsing "%s": %r' % (path, e)), None, sys.exc_info()[2] # not a valid py3 syntax
raise ConfigParsingError('Failed parsing config "%s": %r' % (path, e))
else:
#raise ConfigParsingError('Failed parsing config "%s"' % path) from e # not a valid py2 syntax
six.raise_from(ConfigParsingError('Failed parsing config "%s"' % path), e)
def _get_pwd(service=None):
"""
Returns the present working directory for the given service,
or all services if none specified.
"""
def to_text(data):
if six.PY2: # pragma: no cover
data = data.decode(locale.getpreferredencoding(False))
return data
parser = _get_env()
if service:
try:
return to_text(utils.with_trailing_slash(parser.get('env', service)))
except configparser.NoOptionError as e:
six.raise_from(ValueError('%s is an invalid service' % service), e)
return [to_text(utils.with_trailing_slash(value)) for name, value in parser.items('env')]
def listdir(self, path):
self.check()
path = relpath(path)
if path:
try:
member = self._tar.getmember(path)
except KeyError:
six.raise_from(errors.ResourceNotFound(path), None)
else:
if not member.isdir():
six.raise_from(errors.DirectoryExpected(path), None)
return [
basename(member.name)
for member in self._tar
if dirname(member.path) == path
]
def openbin(self, path, mode="r", buffering=-1, **options):
self.check()
path = relpath(normpath(path))
if 'w' in mode or '+' in mode or 'a' in mode:
raise errors.ResourceReadOnly(path)
try:
member = self._tar.getmember(path)
except KeyError:
six.raise_from(errors.ResourceNotFound(path), None)
if not member.isfile():
raise errors.FileExpected(path)
rw = RawWrapper(self._tar.extractfile(member))
if six.PY2: # Patch nonexistent file.flush in Python2
def _flush():
pass
rw.flush = _flush
return rw
def test_raise_from():
try:
try:
raise Exception("blah")
except Exception:
ctx = sys.exc_info()[1]
f = Exception("foo")
six.raise_from(f, None)
except Exception:
tp, val, tb = sys.exc_info()
if sys.version_info[:2] > (3, 0):
# We should have done a raise f from None equivalent.
assert val.__cause__ is None
assert val.__context__ is ctx
if sys.version_info[:2] >= (3, 3):
# And that should suppress the context on the exception.
assert val.__suppress_context__
# For all versions the outer exception should have raised successfully.
assert str(val) == "foo"
def makedir(self, path, permissions=None, recreate=False): # noqa: D102
self.check()
_permissions = permissions or Permissions(mode=0o755)
_path = self.validatepath(path)
try:
info = self.getinfo(_path)
except errors.ResourceNotFound:
with self._lock:
with convert_sshfs_errors('makedir', path):
self._sftp.mkdir(_path, _permissions.mode)
else:
if (info.is_dir and not recreate) or info.is_file:
six.raise_from(errors.DirectoryExists(path), None)
return self.opendir(path)
def _handle_error(self, err):
"""
Handle execution error state and sleep the required amount of time.
"""
# Update latest cached error
self.error = err
# Defaults to false
retry = True
# Evaluate if error is legit or should be retried
if self.error_evaluator:
retry = self.error_evaluator(err)
# If evalutor returns an error exception, just raise it
if retry and isinstance(retry, Exception):
raise_from(retry, self.error)
# If retry evaluator returns False, raise original error and
# stop the retry cycle
if retry is False:
raise err
def _literal_terminal_to_python_val(literal_terminal):
"""
Use the table of "coercer" functions to convert a terminal node from the
parse tree to a Python value.
"""
token_type = literal_terminal.getSymbol().type
token_text = literal_terminal.getText()
if token_type in _TOKEN_TYPE_COERCERS:
coercer = _TOKEN_TYPE_COERCERS[token_type]
try:
python_value = coercer(token_text)
except Exception as e:
six.raise_from(MatcherException(u"Invalid {}: {}".format(
STIXPatternParser.symbolicNames[token_type], token_text
)), e)
else:
raise MatcherInternalError(u"Unsupported literal type: {}".format(
STIXPatternParser.symbolicNames[token_type]))
return python_value
def exitStartStopQualifier(self, ctx):
"""
Consumes nothing
Produces a (datetime, datetime) 2-tuple containing the start and stop
times.
"""
start_str = _literal_terminal_to_python_val(ctx.StringLiteral(0))
stop_str = _literal_terminal_to_python_val(ctx.StringLiteral(1))
# If the language used timestamp literals here, this could go away...
try:
start_dt = _str_to_datetime(start_str)
stop_dt = _str_to_datetime(stop_str)
except ValueError as e:
# re-raise as MatcherException.
raise six.raise_from(MatcherException(*e.args), e)
self.__push((start_dt, stop_dt), u"exitStartStopQualifier")
def test_raise_from():
try:
try:
raise Exception("blah")
except Exception:
ctx = sys.exc_info()[1]
f = Exception("foo")
six.raise_from(f, None)
except Exception:
tp, val, tb = sys.exc_info()
if sys.version_info[:2] > (3, 0):
# We should have done a raise f from None equivalent.
assert val.__cause__ is None
assert val.__context__ is ctx
if sys.version_info[:2] >= (3, 3):
# And that should suppress the context on the exception.
assert val.__suppress_context__
# For all versions the outer exception should have raised successfully.
assert str(val) == "foo"
def handle_line(self, raw_line):
try:
line = raw_line.decode('utf-8')
except UnicodeDecodeError as err:
six.raise_from(
MastodonMalformedEventError("Malformed UTF-8"),
err
)
if line.startswith(':'):
self.handle_heartbeat()
elif line == '':
# end of event
self._dispatch(self.event)
self.event = {}
else:
key, value = line.split(': ', 1)
# According to the MDN spec, repeating the 'data' key
# represents a newline(!)
if key in self.event:
self.event[key] += '\n' + value
else:
self.event[key] = value
def har_input(paths):
for path in paths:
# According to the spec, HAR files are UTF-8 with an optional BOM.
path = decode_path(path)
with io.open(path, 'rt', encoding='utf-8-sig') as f:
try:
data = json.load(f)
except ValueError as exc:
six.raise_from(
InputError(u'%s: bad HAR file: %s' % (path, exc)),
exc)
try:
creator = data['log']['creator']['name']
for entry in data['log']['entries']:
yield _process_entry(entry, creator, path)
except (TypeError, KeyError) as exc:
six.raise_from(
InputError(u'%s: cannot understand HAR file: %r' %
(path, exc)),
exc)
def test_raise_from():
try:
try:
raise Exception("blah")
except Exception:
ctx = sys.exc_info()[1]
f = Exception("foo")
six.raise_from(f, None)
except Exception:
tp, val, tb = sys.exc_info()
if sys.version_info[:2] > (3, 0):
# We should have done a raise f from None equivalent.
assert val.__cause__ is None
assert val.__context__ is ctx
if sys.version_info[:2] >= (3, 3):
# And that should suppress the context on the exception.
assert val.__suppress_context__
# For all versions the outer exception should have raised successfully.
assert str(val) == "foo"
def request(self, buf):
try:
self.sock.send(buf)
except SSL.Error as exc:
raise_from(Error("SSL Error"), exc)
else:
try:
#peek_bytes = self.sock.recv(8, socket.MSG_PEEK) # peeky no worky?!
peek_bytes = self.sock.recv(8)
except SSL.WantReadError as exc:
# SSL timeout does not work properly. If no data is available from server,
# we'll get this error
pass
except SSL.Error as exc:
raise
#raise_from(Error("SSL Error"), exc)
else:
(ver, type_, length) = struct.unpack('>HHL', peek_bytes)
return bytearray(peek_bytes + self.sock.recv(length))
def response(self):
try:
peek_bytes = self.sock.recv(8, socket.MSG_PEEK)
except SSL.Error as exc:
raise_from(Error("SSL Error"), exc)
else:
(ver, type_, length) = struct.unpack('>HHL', peek_bytes)
return bytearray(peek_bytes + self.sock.recv(length))
def next(self, message, backoff_exchange_name):
total_attempts = 0
for deadlettered in message.headers.get('x-death', ()):
if deadlettered['exchange'] == backoff_exchange_name:
total_attempts += int(deadlettered['count'])
if self.limit and total_attempts >= self.limit:
expired = Backoff.Expired(
"Backoff aborted after '{}' retries (~{:.0f} seconds)".format(
self.limit, self.max_delay / 1000
)
)
six.raise_from(expired, self)
expiration = self.get_next_schedule_item(total_attempts)
if self.random_sigma:
randomised = int(random.gauss(expiration, self.random_sigma))
group_size = self.random_sigma / self.random_groups_per_sigma
expiration = round_to_nearest(randomised, interval=group_size)
# Prevent any negative values created by randomness
expiration = abs(expiration)
# store calculation results on self.
self._next_expiration = expiration
self._total_attempts = total_attempts
return expiration
def entrypoint_retry(*args, **kwargs):
"""
Decorator to declare that an entrypoint can be retried on failure.
For use with nameko_amqp_retry enabled entrypoints.
:param retry_for: An exception class or tuple of exception classes.
If the wrapped function raises one of these exceptions, the entrypoint
will be retried until successful, or the `limit` number of retries
is reached.
:param limit: integer
The maximum number of times the entrypoint can be retried before
giving up (and raising a ``Backoff.Expired`` exception).
If not given, the default `Backoff.limit` will be used.
:param schedule: tuple of integers
A tuple defining the number of milliseconds to wait between each
retry. If not given, the default `Backoff.schedule` will be used.
:param random_sigma: integer
Standard deviation as milliseconds. If not given,
the default `Backoff.random_sigma` will be used.
:param random_groups_per_sigma: integer
Random backoffs are rounded to nearest group. If not given,
the default `Backoff.random_groups_per_sigma` will be used.
"""
retry_for = kwargs.get('retry_for') or Exception
backoff_cls = backoff_factory(*args, **kwargs)
@wrapt.decorator
def wrapper(wrapped, instance, args, kwargs):
try:
return wrapped(*args, **kwargs)
except retry_for as exc:
six.raise_from(backoff_cls(), exc)
return wrapper
def assert_has_calls(self, calls, any_order=False):
"""assert the mock has been called with the specified calls.
The `mock_calls` list is checked for the calls.
If `any_order` is False (the default) then the calls must be
sequential. There can be extra calls before or after the
specified calls.
If `any_order` is True then the calls can be in any order, but
they must all appear in `mock_calls`."""
expected = [self._call_matcher(c) for c in calls]
cause = expected if isinstance(expected, Exception) else None
all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
if not any_order:
if expected not in all_calls:
six.raise_from(AssertionError(
'Calls not found.\nExpected: %r\n'
'Actual: %r' % (_CallList(calls), self.mock_calls)
), cause)
return
all_calls = list(all_calls)
not_found = []
for kall in expected:
try:
all_calls.remove(kall)
except ValueError:
not_found.append(kall)
if not_found:
six.raise_from(AssertionError(
'%r not all found in call list' % (tuple(not_found),)
), cause)
def assert_any_call(self, *args, **kwargs):
"""assert the mock has been called with the specified arguments.
The assert passes if the mock has *ever* been called, unlike
`assert_called_with` and `assert_called_once_with` that only pass if
the call is the most recent one."""
expected = self._call_matcher((args, kwargs))
actual = [self._call_matcher(c) for c in self.call_args_list]
if expected not in actual:
cause = expected if isinstance(expected, Exception) else None
expected_string = self._format_mock_call_signature(args, kwargs)
six.raise_from(AssertionError(
'%s call not found' % expected_string
), cause)
def raise_from(value, from_value):
raise value
def test_cause(self):
def inner_raise():
try:
self.zero_div()
except ZeroDivisionError as e:
raise_from(KeyError, e)
def outer_raise():
inner_raise() # Marker
blocks = boundaries.split(self.get_report(outer_raise))
self.assertEqual(len(blocks), 3)
self.assertEqual(blocks[1], cause_message)
self.check_zero_div(blocks[0])
self.assertIn('inner_raise() # Marker', blocks[2])
def raise_from(value, from_value):
raise value