python类raise_from()的实例源码

mock.py 文件源码 项目:auger 作者: laffra 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def assert_called_with(_mock_self, *args, **kwargs):
        """assert that the mock was called with the specified arguments.

        Raises an AssertionError if the args and keyword args passed in are
        different to the last call to the mock."""
        self = _mock_self
        if self.call_args is None:
            expected = self._format_mock_call_signature(args, kwargs)
            raise AssertionError('Expected call: %s\nNot called' % (expected,))

        def _error_message(cause):
            msg = self._format_mock_failure_message(args, kwargs)
            if six.PY2 and cause is not None:
                # Tack on some diagnostics for Python without __cause__
                msg = '%s\n%s' % (msg, str(cause))
            return msg
        expected = self._call_matcher((args, kwargs))
        actual = self._call_matcher(self.call_args)
        if expected != actual:
            cause = expected if isinstance(expected, Exception) else None
            six.raise_from(AssertionError(_error_message(cause)), cause)
test_traceback.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_context_suppression(self):
        try:
            try:
                raise Exception
            except:
                raise_from(ZeroDivisionError, None)
        except ZeroDivisionError as _:
            e = _
            tb = sys.exc_info()[2]
        lines = self.get_report(e, tb)
        self.assertThat(lines, DocTestMatches("""\
Traceback (most recent call last):
  File "...traceback2/tests/test_traceback.py", line ..., in test_context_suppression
    raise_from(ZeroDivisionError, None)
  File "<string>", line 2, in raise_from
ZeroDivisionError
""", doctest.ELLIPSIS))
test_traceback.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_cause_and_context(self):
        # When both a cause and a context are set, only the cause should be
        # displayed and the context should be muted.
        def inner_raise():
            try:
                self.zero_div()
            except ZeroDivisionError as _e:
                e = _e
            try:
                xyzzy
            except NameError:
                raise_from(KeyError, e)
        def outer_raise():
            inner_raise() # Marker
        blocks = boundaries.split(self.get_report(outer_raise))
        self.assertEqual(len(blocks), 3)
        self.assertEqual(blocks[1], cause_message)
        self.check_zero_div(blocks[0])
        self.assertIn('inner_raise() # Marker', blocks[2])
test_traceback.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_cause_recursive(self):
        def inner_raise():
            try:
                try:
                    self.zero_div()
                except ZeroDivisionError as e:
                    z = e
                    raise_from(KeyError, e)
            except KeyError as e:
                raise_from(z, e)
        def outer_raise():
            inner_raise() # Marker
        blocks = boundaries.split(self.get_report(outer_raise))
        self.assertEqual(len(blocks), 3)
        self.assertEqual(blocks[1], cause_message)
        # The first block is the KeyError raised from the ZeroDivisionError
        self.assertIn('raise_from(KeyError, e)', blocks[0])
        self.assertNotIn('1/0', blocks[0])
        # The second block (apart from the boundary) is the ZeroDivisionError
        # re-raised from the KeyError
        self.assertIn('inner_raise() # Marker', blocks[2])
        self.check_zero_div(blocks[2])
test_traceback.py 文件源码 项目:deb-python-traceback2 作者: openstack 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_context_suppression(self):
        try:
            try:
                raise Exception
            except:
                raise_from(ZeroDivisionError, None)
        except ZeroDivisionError as _:
            e = _
            tb = sys.exc_info()[2]
        lines = self.get_report(e, tb)
        self.assertThat(lines, DocTestMatches("""\
Traceback (most recent call last):
  File "...traceback2/tests/test_traceback.py", line ..., in test_context_suppression
    raise_from(ZeroDivisionError, None)
  File "<string>", line 2, in raise_from
ZeroDivisionError
""", doctest.ELLIPSIS))
test_traceback.py 文件源码 项目:deb-python-traceback2 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_cause_and_context(self):
        # When both a cause and a context are set, only the cause should be
        # displayed and the context should be muted.
        def inner_raise():
            try:
                self.zero_div()
            except ZeroDivisionError as _e:
                e = _e
            try:
                xyzzy
            except NameError:
                raise_from(KeyError, e)
        def outer_raise():
            inner_raise() # Marker
        blocks = boundaries.split(self.get_report(outer_raise))
        self.assertEqual(len(blocks), 3)
        self.assertEqual(blocks[1], cause_message)
        self.check_zero_div(blocks[0])
        self.assertIn('inner_raise() # Marker', blocks[2])
test_traceback.py 文件源码 项目:deb-python-traceback2 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_cause_recursive(self):
        def inner_raise():
            try:
                try:
                    self.zero_div()
                except ZeroDivisionError as e:
                    z = e
                    raise_from(KeyError, e)
            except KeyError as e:
                raise_from(z, e)
        def outer_raise():
            inner_raise() # Marker
        blocks = boundaries.split(self.get_report(outer_raise))
        self.assertEqual(len(blocks), 3)
        self.assertEqual(blocks[1], cause_message)
        # The first block is the KeyError raised from the ZeroDivisionError
        self.assertIn('raise_from(KeyError, e)', blocks[0])
        self.assertNotIn('1/0', blocks[0])
        # The second block (apart from the boundary) is the ZeroDivisionError
        # re-raised from the KeyError
        self.assertIn('inner_raise() # Marker', blocks[2])
        self.check_zero_div(blocks[2])
mock.py 文件源码 项目:RPoint 作者: george17-meet 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def assert_called_with(_mock_self, *args, **kwargs):
        """assert that the mock was called with the specified arguments.

        Raises an AssertionError if the args and keyword args passed in are
        different to the last call to the mock."""
        self = _mock_self
        if self.call_args is None:
            expected = self._format_mock_call_signature(args, kwargs)
            raise AssertionError('Expected call: %s\nNot called' % (expected,))

        def _error_message(cause):
            msg = self._format_mock_failure_message(args, kwargs)
            if six.PY2 and cause is not None:
                # Tack on some diagnostics for Python without __cause__
                msg = '%s\n%s' % (msg, str(cause))
            return msg
        expected = self._call_matcher((args, kwargs))
        actual = self._call_matcher(self.call_args)
        if expected != actual:
            cause = expected if isinstance(expected, Exception) else None
            six.raise_from(AssertionError(_error_message(cause)), cause)
importutils.py 文件源码 项目:figura 作者: shx2 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def import_figura_file(path):
    """
    Import a figura config file (with no side affects).

    :param path: a python import path
    :return: a python module object
    :raise ConfigParsingError: if importing fails
    """
    try:
        return _import_module_no_side_effects(path)
    except Exception as e:
        if six.PY2:
            # no exception chaining in python2
            #raise ConfigParsingError('Failed parsing "%s": %r' % (path, e)), None, sys.exc_info()[2]  # not a valid py3 syntax
            raise ConfigParsingError('Failed parsing config "%s": %r' % (path, e))
        else:
            #raise ConfigParsingError('Failed parsing config "%s"' % path) from e  # not a valid py2 syntax
            six.raise_from(ConfigParsingError('Failed parsing config "%s"' % path), e)
cli.py 文件源码 项目:stor 作者: counsyl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_pwd(service=None):
    """
    Returns the present working directory for the given service,
    or all services if none specified.
    """
    def to_text(data):
        if six.PY2:  # pragma: no cover
            data = data.decode(locale.getpreferredencoding(False))
        return data

    parser = _get_env()
    if service:
        try:
            return to_text(utils.with_trailing_slash(parser.get('env', service)))
        except configparser.NoOptionError as e:
            six.raise_from(ValueError('%s is an invalid service' % service), e)
    return [to_text(utils.with_trailing_slash(value)) for name, value in parser.items('env')]
tarfs.py 文件源码 项目:pyfilesystem2 作者: PyFilesystem 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def listdir(self, path):
        self.check()
        path = relpath(path)
        if path:
            try:
                member = self._tar.getmember(path)
            except KeyError:
                six.raise_from(errors.ResourceNotFound(path), None)
            else:
                if not member.isdir():
                    six.raise_from(errors.DirectoryExpected(path), None)

        return [
            basename(member.name)
            for member in self._tar
            if dirname(member.path) == path
        ]
tarfs.py 文件源码 项目:pyfilesystem2 作者: PyFilesystem 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def openbin(self, path, mode="r", buffering=-1, **options):
        self.check()
        path = relpath(normpath(path))

        if 'w' in mode or '+' in mode or 'a' in mode:
            raise errors.ResourceReadOnly(path)

        try:
            member = self._tar.getmember(path)
        except KeyError:
            six.raise_from(errors.ResourceNotFound(path), None)

        if not member.isfile():
            raise errors.FileExpected(path)

        rw = RawWrapper(self._tar.extractfile(member))

        if six.PY2: # Patch nonexistent file.flush in Python2
            def _flush():
                pass
            rw.flush = _flush

        return rw
test_six.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_raise_from():
    try:
        try:
            raise Exception("blah")
        except Exception:
            ctx = sys.exc_info()[1]
            f = Exception("foo")
            six.raise_from(f, None)
    except Exception:
        tp, val, tb = sys.exc_info()
    if sys.version_info[:2] > (3, 0):
        # We should have done a raise f from None equivalent.
        assert val.__cause__ is None
        assert val.__context__ is ctx
    if sys.version_info[:2] >= (3, 3):
        # And that should suppress the context on the exception.
        assert val.__suppress_context__
    # For all versions the outer exception should have raised successfully.
    assert str(val) == "foo"
sshfs.py 文件源码 项目:fs.sshfs 作者: althonos 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def makedir(self, path, permissions=None, recreate=False):  # noqa: D102
        self.check()
        _permissions = permissions or Permissions(mode=0o755)
        _path = self.validatepath(path)

        try:
            info = self.getinfo(_path)
        except errors.ResourceNotFound:
            with self._lock:
                with convert_sshfs_errors('makedir', path):
                    self._sftp.mkdir(_path, _permissions.mode)
        else:
            if (info.is_dir and not recreate) or info.is_file:
                six.raise_from(errors.DirectoryExists(path), None)

        return self.opendir(path)
retrier.py 文件源码 项目:riprova 作者: h2non 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _handle_error(self, err):
        """
        Handle execution error state and sleep the required amount of time.
        """
        # Update latest cached error
        self.error = err

        # Defaults to false
        retry = True

        # Evaluate if error is legit or should be retried
        if self.error_evaluator:
            retry = self.error_evaluator(err)

        # If evalutor returns an error exception, just raise it
        if retry and isinstance(retry, Exception):
            raise_from(retry, self.error)

        # If retry evaluator returns False, raise original error and
        # stop the retry cycle
        if retry is False:
            raise err
matcher.py 文件源码 项目:cti-pattern-matcher 作者: oasis-open 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _literal_terminal_to_python_val(literal_terminal):
    """
    Use the table of "coercer" functions to convert a terminal node from the
    parse tree to a Python value.
    """
    token_type = literal_terminal.getSymbol().type
    token_text = literal_terminal.getText()

    if token_type in _TOKEN_TYPE_COERCERS:
        coercer = _TOKEN_TYPE_COERCERS[token_type]
        try:
            python_value = coercer(token_text)
        except Exception as e:
            six.raise_from(MatcherException(u"Invalid {}: {}".format(
                STIXPatternParser.symbolicNames[token_type], token_text
            )), e)
    else:
        raise MatcherInternalError(u"Unsupported literal type: {}".format(
            STIXPatternParser.symbolicNames[token_type]))

    return python_value
matcher.py 文件源码 项目:cti-pattern-matcher 作者: oasis-open 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def exitStartStopQualifier(self, ctx):
        """
        Consumes nothing
        Produces a (datetime, datetime) 2-tuple containing the start and stop
          times.
        """

        start_str = _literal_terminal_to_python_val(ctx.StringLiteral(0))
        stop_str = _literal_terminal_to_python_val(ctx.StringLiteral(1))

        # If the language used timestamp literals here, this could go away...
        try:
            start_dt = _str_to_datetime(start_str)
            stop_dt = _str_to_datetime(stop_str)
        except ValueError as e:
            # re-raise as MatcherException.
            raise six.raise_from(MatcherException(*e.args), e)

        self.__push((start_dt, stop_dt), u"exitStartStopQualifier")
test_six.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_raise_from():
    try:
        try:
            raise Exception("blah")
        except Exception:
            ctx = sys.exc_info()[1]
            f = Exception("foo")
            six.raise_from(f, None)
    except Exception:
        tp, val, tb = sys.exc_info()
    if sys.version_info[:2] > (3, 0):
        # We should have done a raise f from None equivalent.
        assert val.__cause__ is None
        assert val.__context__ is ctx
    if sys.version_info[:2] >= (3, 3):
        # And that should suppress the context on the exception.
        assert val.__suppress_context__
    # For all versions the outer exception should have raised successfully.
    assert str(val) == "foo"
streaming.py 文件源码 项目:Mastodon.py 作者: halcy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def handle_line(self, raw_line):
        try:
            line = raw_line.decode('utf-8')
        except UnicodeDecodeError as err:
            six.raise_from(
                MastodonMalformedEventError("Malformed UTF-8"),
                err
            )

        if line.startswith(':'):
            self.handle_heartbeat()
        elif line == '':
            # end of event
            self._dispatch(self.event)
            self.event = {}
        else:
            key, value = line.split(': ', 1)
            # According to the MDN spec, repeating the 'data' key
            # represents a newline(!)
            if key in self.event:
                self.event[key] += '\n' + value
            else:
                self.event[key] = value
har.py 文件源码 项目:httpolice 作者: vfaronov 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def har_input(paths):
    for path in paths:
        # According to the spec, HAR files are UTF-8 with an optional BOM.
        path = decode_path(path)
        with io.open(path, 'rt', encoding='utf-8-sig') as f:
            try:
                data = json.load(f)
            except ValueError as exc:
                six.raise_from(
                    InputError(u'%s: bad HAR file: %s' % (path, exc)),
                    exc)
            try:
                creator = data['log']['creator']['name']
                for entry in data['log']['entries']:
                    yield _process_entry(entry, creator, path)
            except (TypeError, KeyError) as exc:
                six.raise_from(
                    InputError(u'%s: cannot understand HAR file: %r' %
                               (path, exc)),
                    exc)
test_six.py 文件源码 项目:six 作者: benjaminp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_raise_from():
    try:
        try:
            raise Exception("blah")
        except Exception:
            ctx = sys.exc_info()[1]
            f = Exception("foo")
            six.raise_from(f, None)
    except Exception:
        tp, val, tb = sys.exc_info()
    if sys.version_info[:2] > (3, 0):
        # We should have done a raise f from None equivalent.
        assert val.__cause__ is None
        assert val.__context__ is ctx
    if sys.version_info[:2] >= (3, 3):
        # And that should suppress the context on the exception.
        assert val.__suppress_context__
    # For all versions the outer exception should have raised successfully.
    assert str(val) == "foo"
streamer.py 文件源码 项目:estreamer 作者: spohara79 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def request(self, buf):
        try:
            self.sock.send(buf)
        except SSL.Error as exc:
            raise_from(Error("SSL Error"), exc)
        else:
            try:
                #peek_bytes = self.sock.recv(8, socket.MSG_PEEK) # peeky no worky?!
                peek_bytes = self.sock.recv(8)
            except SSL.WantReadError as exc:
                # SSL timeout does not work properly. If no data is available from server,
                # we'll get this error
                pass
            except SSL.Error as exc:
                raise
                #raise_from(Error("SSL Error"), exc)
            else:
                (ver, type_, length) = struct.unpack('>HHL', peek_bytes)
                return bytearray(peek_bytes + self.sock.recv(length))
streamer.py 文件源码 项目:estreamer 作者: spohara79 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def response(self):
        try:
            peek_bytes = self.sock.recv(8, socket.MSG_PEEK)
        except SSL.Error as exc:
            raise_from(Error("SSL Error"), exc)
        else:
            (ver, type_, length) = struct.unpack('>HHL', peek_bytes)
            return bytearray(peek_bytes + self.sock.recv(length))
backoff.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def next(self, message, backoff_exchange_name):

        total_attempts = 0
        for deadlettered in message.headers.get('x-death', ()):
            if deadlettered['exchange'] == backoff_exchange_name:
                total_attempts += int(deadlettered['count'])

        if self.limit and total_attempts >= self.limit:
            expired = Backoff.Expired(
                "Backoff aborted after '{}' retries (~{:.0f} seconds)".format(
                    self.limit, self.max_delay / 1000
                )
            )
            six.raise_from(expired, self)

        expiration = self.get_next_schedule_item(total_attempts)

        if self.random_sigma:
            randomised = int(random.gauss(expiration, self.random_sigma))
            group_size = self.random_sigma / self.random_groups_per_sigma
            expiration = round_to_nearest(randomised, interval=group_size)

        # Prevent any negative values created by randomness
        expiration = abs(expiration)

        # store calculation results on self.
        self._next_expiration = expiration
        self._total_attempts = total_attempts

        return expiration
decorators.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def entrypoint_retry(*args, **kwargs):
    """
    Decorator to declare that an entrypoint can be retried on failure.

    For use with nameko_amqp_retry enabled entrypoints.

    :param retry_for: An exception class or tuple of exception classes.
        If the wrapped function raises one of these exceptions, the entrypoint
        will be retried until successful, or the `limit` number of retries
        is reached.

    :param limit: integer
         The maximum number of times the entrypoint can be retried before
         giving up (and raising a ``Backoff.Expired`` exception).
         If not given, the default `Backoff.limit` will be used.

    :param schedule: tuple of integers
        A tuple defining the number of milliseconds to wait between each
        retry. If not given, the default `Backoff.schedule` will be used.

    :param random_sigma: integer
        Standard deviation as milliseconds. If not given,
        the default `Backoff.random_sigma` will be used.

    :param random_groups_per_sigma: integer
        Random backoffs are rounded to nearest group. If not given,
        the default `Backoff.random_groups_per_sigma` will be used.
    """
    retry_for = kwargs.get('retry_for') or Exception
    backoff_cls = backoff_factory(*args, **kwargs)

    @wrapt.decorator
    def wrapper(wrapped, instance, args, kwargs):
        try:
            return wrapped(*args, **kwargs)
        except retry_for as exc:
            six.raise_from(backoff_cls(), exc)

    return wrapper
mock.py 文件源码 项目:auger 作者: laffra 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def assert_has_calls(self, calls, any_order=False):
        """assert the mock has been called with the specified calls.
        The `mock_calls` list is checked for the calls.

        If `any_order` is False (the default) then the calls must be
        sequential. There can be extra calls before or after the
        specified calls.

        If `any_order` is True then the calls can be in any order, but
        they must all appear in `mock_calls`."""
        expected = [self._call_matcher(c) for c in calls]
        cause = expected if isinstance(expected, Exception) else None
        all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
        if not any_order:
            if expected not in all_calls:
                six.raise_from(AssertionError(
                    'Calls not found.\nExpected: %r\n'
                    'Actual: %r' % (_CallList(calls), self.mock_calls)
                ), cause)
            return

        all_calls = list(all_calls)

        not_found = []
        for kall in expected:
            try:
                all_calls.remove(kall)
            except ValueError:
                not_found.append(kall)
        if not_found:
            six.raise_from(AssertionError(
                '%r not all found in call list' % (tuple(not_found),)
            ), cause)
mock.py 文件源码 项目:auger 作者: laffra 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def assert_any_call(self, *args, **kwargs):
        """assert the mock has been called with the specified arguments.

        The assert passes if the mock has *ever* been called, unlike
        `assert_called_with` and `assert_called_once_with` that only pass if
        the call is the most recent one."""
        expected = self._call_matcher((args, kwargs))
        actual = [self._call_matcher(c) for c in self.call_args_list]
        if expected not in actual:
            cause = expected if isinstance(expected, Exception) else None
            expected_string = self._format_mock_call_signature(args, kwargs)
            six.raise_from(AssertionError(
                '%s call not found' % expected_string
            ), cause)
test_traceback.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def raise_from(value, from_value):
            raise value
test_traceback.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_cause(self):
        def inner_raise():
            try:
                self.zero_div()
            except ZeroDivisionError as e:
                raise_from(KeyError, e)
        def outer_raise():
            inner_raise() # Marker
        blocks = boundaries.split(self.get_report(outer_raise))
        self.assertEqual(len(blocks), 3)
        self.assertEqual(blocks[1], cause_message)
        self.check_zero_div(blocks[0])
        self.assertIn('inner_raise() # Marker', blocks[2])
test_traceback.py 文件源码 项目:deb-python-traceback2 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def raise_from(value, from_value):
            raise value


问题


面经


文章

微信
公众号

扫码关注公众号