python类raise_from()的实例源码

test_traceback.py 文件源码 项目:deb-python-traceback2 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_cause(self):
        def inner_raise():
            try:
                self.zero_div()
            except ZeroDivisionError as e:
                raise_from(KeyError, e)
        def outer_raise():
            inner_raise() # Marker
        blocks = boundaries.split(self.get_report(outer_raise))
        self.assertEqual(len(blocks), 3)
        self.assertEqual(blocks[1], cause_message)
        self.check_zero_div(blocks[0])
        self.assertIn('inner_raise() # Marker', blocks[2])
util.py 文件源码 项目:filefinder2 作者: asmodehn 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _find_spec_from_path(name, path=None):
    """Return the spec for the specified module.
    First, sys.modules is checked to see if the module was already imported. If
    so, then sys.modules[name].__spec__ is returned. If that happens to be
    set to None, then ValueError is raised. If the module is not in
    sys.modules, then sys.meta_path is searched for a suitable spec with the
    value of 'path' given to the finders. None is returned if no spec could
    be found.
    Dotted names do not have their parent packages implicitly imported. You will
    most likely need to explicitly import all parent packages in the proper
    order for a submodule to get the correct spec.
    """
    if name not in sys.modules:
        return _find_spec(name, path)
    else:
        module = sys.modules[name]
        if module is None:
            return None
        try:
            spec = module.__spec__
        except AttributeError:
            six.raise_from(ValueError('{}.__spec__ is not set'.format(name)), None)
        else:
            if spec is None:
                raise ValueError('{}.__spec__ is None'.format(name))
            return spec
util.py 文件源码 项目:filefinder2 作者: asmodehn 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def find_spec(name, package=None):
    """Return the spec for the specified module.
    First, sys.modules is checked to see if the module was already imported. If
    so, then sys.modules[name].__spec__ is returned. If that happens to be
    set to None, then ValueError is raised. If the module is not in
    sys.modules, then sys.meta_path is searched for a suitable spec with the
    value of 'path' given to the finders. None is returned if no spec could
    be found.
    If the name is for submodule (contains a dot), the parent module is
    automatically imported.
    The name and package arguments work the same as importlib.import_module().
    In other words, relative module names (with leading dots) work.
    """
    fullname = resolve_name(name, package) if name.startswith('.') else name
    if fullname not in sys.modules:
        parent_name = fullname.rpartition('.')[0]
        if parent_name:
            # Use builtins.__import__() in case someone replaced it.
            parent = __import__(parent_name, fromlist=['__path__'])
            return _find_spec(fullname, parent.__path__)
        else:
            return _find_spec(fullname, None)
    else:
        module = sys.modules[fullname]
        if module is None:
            return None
        try:
            spec = module.__spec__
        except AttributeError:
            six.raise_from(ValueError('{}.__spec__ is not set'.format(name)), None)
        else:
            if spec is None:
                raise ValueError('{}.__spec__ is None'.format(name))
            return spec
__init__.py 文件源码 项目:filefinder2 作者: asmodehn 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def find_loader(name, path=None):
        """Return the loader for the specified module.

        This is a backward-compatible wrapper around find_spec().

        This function is deprecated in favor of importlib.util.find_spec().

        """
        warnings.warn('Use importlib.util.find_spec() instead.',
                      DeprecationWarning, stacklevel=2)
        try:
            loader = sys.modules[name].__loader__
            if loader is None:
                raise ValueError('{}.__loader__ is None'.format(name))
            else:
                return loader
        except KeyError:
            pass
        except AttributeError:
            six.raise_from(ValueError('{}.__loader__ is not set'.format(name)), None)

        spec = _find_spec(name, path)
        # We won't worry about malformed specs (missing attributes).
        if spec is None:
            return None
        if spec.loader is None:
            if spec.submodule_search_locations is None:
                raise _ImportError('spec for {} missing loader'.format(name),
                                  name=name)
            raise _ImportError('namespace packages do not have loaders',
                              name=name)
        return spec.loader


# This should be in all python >2.7
excutils.py 文件源码 项目:deb-oslo.utils 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def raise_with_cause(exc_cls, message, *args, **kwargs):
    """Helper to raise + chain exceptions (when able) and associate a *cause*.

    NOTE(harlowja): Since in py3.x exceptions can be chained (due to
    :pep:`3134`) we should try to raise the desired exception with the given
    *cause* (or extract a *cause* from the current stack if able) so that the
    exception formats nicely in old and new versions of python. Since py2.x
    does **not** support exception chaining (or formatting) the exception
    class provided should take a ``cause`` keyword argument (which it may
    discard if it wants) to its constructor which can then be
    inspected/retained on py2.x to get *similar* information as would be
    automatically included/obtainable in py3.x.

    :param exc_cls: the exception class to raise (typically one derived
                    from :py:class:`.CausedByException` or equivalent).
    :param message: the text/str message that will be passed to
                    the exceptions constructor as its first positional
                    argument.
    :param args: any additional positional arguments to pass to the
                 exceptions constructor.
    :param kwargs: any additional keyword arguments to pass to the
                   exceptions constructor.

    .. versionadded:: 1.6
    """
    if 'cause' not in kwargs:
        exc_type, exc, exc_tb = sys.exc_info()
        try:
            if exc is not None:
                kwargs['cause'] = exc
        finally:
            # Leave no references around (especially with regards to
            # tracebacks and any variables that it retains internally).
            del(exc_type, exc, exc_tb)
    six.raise_from(exc_cls(message, *args, **kwargs), kwargs.get('cause'))
versionutils.py 文件源码 项目:deb-oslo.utils 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def convert_version_to_int(version):
    """Convert a version to an integer.

    *version* must be a string with dots or a tuple of integers.

    .. versionadded:: 2.0
    """
    try:
        if isinstance(version, six.string_types):
            version = convert_version_to_tuple(version)
        if isinstance(version, tuple):
            return six.moves.reduce(lambda x, y: (x * 1000) + y, version)
    except Exception as ex:
        msg = _("Version %s is invalid.") % version
        six.raise_from(ValueError(msg), ex)
mock.py 文件源码 项目:RPoint 作者: george17-meet 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def assert_has_calls(self, calls, any_order=False):
        """assert the mock has been called with the specified calls.
        The `mock_calls` list is checked for the calls.

        If `any_order` is False (the default) then the calls must be
        sequential. There can be extra calls before or after the
        specified calls.

        If `any_order` is True then the calls can be in any order, but
        they must all appear in `mock_calls`."""
        expected = [self._call_matcher(c) for c in calls]
        cause = expected if isinstance(expected, Exception) else None
        all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
        if not any_order:
            if expected not in all_calls:
                six.raise_from(AssertionError(
                    'Calls not found.\nExpected: %r\n'
                    'Actual: %r' % (_CallList(calls), self.mock_calls)
                ), cause)
            return

        all_calls = list(all_calls)

        not_found = []
        for kall in expected:
            try:
                all_calls.remove(kall)
            except ValueError:
                not_found.append(kall)
        if not_found:
            six.raise_from(AssertionError(
                '%r not all found in call list' % (tuple(not_found),)
            ), cause)
mock.py 文件源码 项目:RPoint 作者: george17-meet 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def assert_any_call(self, *args, **kwargs):
        """assert the mock has been called with the specified arguments.

        The assert passes if the mock has *ever* been called, unlike
        `assert_called_with` and `assert_called_once_with` that only pass if
        the call is the most recent one."""
        expected = self._call_matcher((args, kwargs))
        actual = [self._call_matcher(c) for c in self.call_args_list]
        if expected not in actual:
            cause = expected if isinstance(expected, Exception) else None
            expected_string = self._format_mock_call_signature(args, kwargs)
            six.raise_from(AssertionError(
                '%s call not found' % expected_string
            ), cause)
__init__.py 文件源码 项目:dpm-py 作者: oki-archive 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _apirequest(self, method, url, *args, **kwargs):
        """
        General request-response processing routine for dpr-api server.

        :return:
            Response -- requests.Response instance

        """

        # TODO: doing this for every request is kinda awkward
        self._ensure_config()

        if not url.startswith('http'):
            # Relative url is given. Build absolute server url
            url = self.server + url

        methods = {
            'POST': requests.post,
            'PUT': requests.put,
            'GET': requests.get,
            'DELETE': requests.delete
        }

        headers = kwargs.pop('headers', {})
        if self.token:
            headers.setdefault('Auth-Token', '%s' % self.token)

        response = methods.get(method)(url, *args, headers=headers, **kwargs)

        try:
            jsonresponse = response.json()
        except Exception as e:
            six.raise_from(
                JSONDecodeError(response, message='Failed to decode JSON response from server'), e)

        if response.status_code not in (200, 201):
            raise HTTPStatusError(response, message='Error %s\n%s' % (
                response.status_code,
                jsonresponse.get('message') or jsonresponse.get('description')))

        return response
dicty.py 文件源码 项目:dicty 作者: vitek 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def raise_from(cls, exc, path=None):
        obj = cls(str(exc), path)
        if six.PY3:
            six.raise_from(obj, exc)
        else:
            _, _, tb = sys.exc_info()
            six.reraise(cls, obj, tb)
dicty.py 文件源码 项目:dicty 作者: vitek 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def validate(self, obj):
        if self.key in obj:
            try:
                self.run_filters(obj)
            except ValueError as exc:
                FieldError.raise_from(exc, self.key)
            except FieldError as exc:
                exc.add_path_info(self.key)
                raise
        else:
            if not self.optional:
                raise FieldError('Is required', self.key)
dicty.py 文件源码 项目:dicty 作者: vitek 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def instantiate(self, value):
        if self.is_json_object:
            return self.type.fromjson(value)
        try:
            return self.type(value)
        except ValueError as exc:
            FieldError.raise_from(exc)
call.py 文件源码 项目:rcli 作者: contains-io 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def call(func, args):
    """Call the function with args normalized and cast to the correct types.

    Args:
        func: The function to call.
        args: The arguments parsed by docopt.

    Returns:
        The return value of func.
    """
    assert hasattr(func, '__call__'), 'Cannot call func: {}'.format(
        func.__name__)
    raw_func = (
        func if isinstance(func, FunctionType) else func.__class__.__call__)
    hints = collections.defaultdict(lambda: Any, get_type_hints(raw_func))
    argspec = _getargspec(raw_func)
    named_args = {}
    varargs = ()
    for k, nk, v in _normalize(args):
        if nk == argspec.varargs:
            hints[nk] = Tuple[hints[nk], ...]
        elif nk not in argspec.args and argspec.varkw in hints:
            hints[nk] = hints[argspec.varkw]
        try:
            value = cast(hints[nk], v)
        except TypeError as e:
            _LOGGER.exception(e)
            six.raise_from(exc.InvalidCliValueError(k, v), e)
        if nk == argspec.varargs:
            varargs = value
        elif (nk in argspec.args or argspec.varkw) and (
                nk not in named_args or named_args[nk] is None):
            named_args[nk] = value
    return func(*varargs, **named_args)
dbapi2.py 文件源码 项目:python-comdb2 作者: bloomberg 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _raise_wrapped_exception(exc):
    code = exc.error_code
    msg = '%s (cdb2api rc %d)' % (exc.error_message, code)
    if "null constraint violation" in msg:
        six.raise_from(NonNullConstraintError(msg), exc)  # DRQS 86013831
    six.raise_from(_EXCEPTION_BY_RC.get(code, OperationalError)(msg), exc)
swift.py 文件源码 项目:stor 作者: counsyl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _propagate_swift_exceptions(func):
    """Bubbles all swift exceptions as `SwiftError` classes
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except (swift_service.SwiftError,
                swift_exceptions.ClientException) as e:
            six.raise_from(_swiftclient_error_to_descriptive_exception(e), e)
    return wrapper
s3.py 文件源码 项目:stor 作者: counsyl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _s3_client_call(self, method_name, *args, **kwargs):
        """
        Creates a boto3 S3 ``Client`` object and runs ``method_name``.
        """
        s3_client = _get_s3_client()
        method = getattr(s3_client, method_name)
        try:
            return method(*args, **kwargs)
        except botocore_exceptions.ClientError as e:
            six.raise_from(_parse_s3_error(e, **kwargs), e)
s3.py 文件源码 项目:stor 作者: counsyl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _make_s3_transfer(self, method_name, config=None, *args, **kwargs):
        """
        Creates a boto3 ``S3Transfer`` object for doing multipart uploads
        and downloads and executes the given method.
        """
        transfer = _get_s3_transfer(config=config)
        method = getattr(transfer, method_name)
        try:
            return method(*args, **kwargs)
        except boto3_exceptions.S3UploadFailedError as e:
            six.raise_from(exceptions.FailedUploadError(str(e), e), e)
        except boto3_exceptions.RetriesExceededError as e:
            six.raise_from(exceptions.FailedDownloadError(str(e), e), e)
__init__.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _write_request(self, request_url, json_message, api_key=None):
        data = {
            'data': base64.b64encode(json_message.encode('utf8')),
            'verbose': 1,
            'ip': 0,
        }
        if api_key:
            data.update({'api_key': api_key})
        encoded_data = urllib.parse.urlencode(data).encode('utf8')
        try:
            request = urllib.request.Request(request_url, encoded_data)

            # Note: We don't send timeout=None here, because the timeout in urllib2 defaults to
            # an internal socket timeout, not None.
            if self._request_timeout is not None:
                response = urllib.request.urlopen(request, timeout=self._request_timeout).read()
            else:
                response = urllib.request.urlopen(request).read()
        except urllib.error.URLError as e:
            raise six.raise_from(MixpanelException(e), e)

        try:
            response = json.loads(response.decode('utf8'))
        except ValueError:
            raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response))

        if response['status'] != 1:
            raise MixpanelException('Mixpanel error: {0}'.format(response['error']))

        return True
__init__.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _flush_endpoint(self, endpoint, api_key=None):
        buf = self._buffers[endpoint]
        while buf:
            batch = buf[:self._max_size]
            batch_json = '[{0}]'.format(','.join(batch))
            try:
                self._consumer.send(endpoint, batch_json, api_key)
            except MixpanelException as orig_e:
                mp_e = MixpanelException(orig_e)
                mp_e.message = batch_json
                mp_e.endpoint = endpoint
                raise six.raise_from(mp_e, orig_e)
            buf = buf[self._max_size:]
        self._buffers[endpoint] = buf
http.py 文件源码 项目:osc-placement 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _wrap_http_exceptions():
    """Reraise osc-lib exceptions with detailed messages."""

    try:
        yield
    except ks_exceptions.HttpError as exc:
        detail = json.loads(exc.response.content)['errors'][0]['detail']
        msg = detail.split('\n')[-1].strip()
        exc_class = _http_error_to_exc.get(exc.http_status,
                                           exceptions.CommandError)

        six.raise_from(exc_class(exc.http_status, msg), exc)


问题


面经


文章

微信
公众号

扫码关注公众号