python类ValidationError()的实例源码

validators.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def validate(self, *args, **kwargs):
        try:
            self.validator.validate(*args, **kwargs)
        except jsonschema.ValidationError as ex:
            if len(ex.path) > 0:
                if self.is_body:
                    detail = _("Invalid input for field '%(path)s'."
                               "Value: '%(value)s'. %(message)s")
                else:
                    detail = _("Invalid input for query parameters "
                               "'%(path)s'. Value: '%(value)s'. %(message)s")
                detail = detail % {
                    'path': ex.path.pop(), 'value': ex.instance,
                    'message': six.text_type(ex)
                }
            else:
                detail = six.text_type(ex)
            raise exception.SchemaValidationError(detail=detail)
formatter.py 文件源码 项目:pytablereader 作者: thombashi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def to_table_data(self):
        """
        :raises ValueError:
        :raises pytablereader.error.ValidationError:
        """

        self._validate_source_data()

        attr_name_set = set()
        for json_record in self._buffer:
            attr_name_set = attr_name_set.union(six.viewkeys(json_record))

        self._loader.inc_table_count()

        yield TableData(
            table_name=self._make_table_name(),
            header_list=sorted(attr_name_set),
            record_list=self._buffer,
            quoting_flags=self._loader.quoting_flags)
formatter.py 文件源码 项目:pytablereader 作者: thombashi 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def to_table_data(self):
        """
        :raises ValueError:
        :raises pytablereader.error.ValidationError:
        """

        self._validate_source_data()
        self._loader.inc_table_count()

        header_list = sorted(six.viewkeys(self._buffer))

        yield TableData(
            table_name=self._make_table_name(),
            header_list=header_list,
            record_list=zip(
                *[self._buffer.get(header) for header in header_list]),
            quoting_flags=self._loader.quoting_flags)
formatter.py 文件源码 项目:pytablereader 作者: thombashi 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def to_table_data(self):
        """
        :raises ValueError:
        :raises pytablereader.error.ValidationError:
        """

        self._validate_source_data()

        for table_key, json_record_list in six.iteritems(self._buffer):
            attr_name_set = set()
            for json_record in json_record_list:
                attr_name_set = attr_name_set.union(six.viewkeys(json_record))

            self._loader.inc_table_count()
            self._table_key = table_key

            yield TableData(
                table_name=self._make_table_name(),
                header_list=sorted(attr_name_set),
                record_list=json_record_list,
                quoting_flags=self._loader.quoting_flags)
formatter.py 文件源码 项目:pytablereader 作者: thombashi 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def to_table_data(self):
        """
        :raises ValueError:
        :raises pytablereader.error.ValidationError:
        """

        self._validate_source_data()

        for table_key, json_record_list in six.iteritems(self._buffer):
            header_list = sorted(six.viewkeys(json_record_list))

            self._loader.inc_table_count()
            self._table_key = table_key

            yield TableData(
                table_name=self._make_table_name(),
                header_list=header_list,
                record_list=zip(
                    *[json_record_list.get(header) for header in header_list]),
                quoting_flags=self._loader.quoting_flags)
config.py 文件源码 项目:pywhdfs 作者: yassineazzouz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, path=None):
      self.path = path or os.getenv('WEBHDFS_CONFIG', self.default_path)
      if osp.exists(self.path):
        try:
          self.config = json.loads(open(self.path).read())
          self.schema = json.loads(resource_string(__name__, 'resources/config_schema.json'))
          #self.schema = open("resources/schema.config").read()
          try:
            js.validate(self.config, self.schema)
          except js.ValidationError as e:
            print e.message
          except js.SchemaError as e:
            print e

        except ParsingError:
          raise HdfsError('Invalid configuration file %r.', self.path)

        _logger.info('Instantiated configuration from %r.', self.path)
      else:
        raise HdfsError('Invalid configuration file %r.', self.path)
schemachecker.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def schema_request(media_type_name):
    def wrapper(fn):
        @wraps(fn)
        def decorated(*args, **kwargs):
            if isinstance(media_type_name, basestring):
                schema = get_schema_for_media_type(media_type_name)
            else:
                schema = media_type_name
            json_object = request.get_json()
            try:
                validate(json_object, schema)
            except ValidationError as e:
                report = generate_validation_error_report(e, json_object)
                abort(400, description=report)
            return fn(*args, **kwargs)

        return decorated
    return wrapper
service.py 文件源码 项目:fuel-ccp 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validate_service_definitions(components_map, components=None):
    if not components:
        components = components_map.keys()
    else:
        validation_base.validate_components_names(components, components_map)

    not_passed_components = set()

    for component in components:
        try:
            jsonschema.validate(components_map[component]["service_content"],
                                SERVICE_SCHEMA,
                                format_checker=ServiceFormatChecker())
        except jsonschema.ValidationError as e:
            LOG.error("Validation of service definitions for component '%s' "
                      "is not passed: '%s'", component, e.message)
            not_passed_components.add(component)

    if not_passed_components:
        raise RuntimeError(
            "Validation of service definitions for {} of {} components is "
            "not passed.".format(len(not_passed_components), len(components))
        )
    else:
        LOG.info("Service definitions validation passed successfully")
fixtures.py 文件源码 项目:apm-agent-python 作者: elastic 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __call__(self, environ, start_response):
        code = self.code
        content = self.content
        request = Request(environ)
        self.requests.append(request)
        data = request.data
        if request.content_encoding == 'deflate':
            data = zlib.decompress(data)
        data = data.decode(request.charset)
        if request.content_type == 'application/json':
            data = json.loads(data)
        self.payloads.append(data)
        validator = VALIDATORS.get(request.path, None)
        if validator and not self.skip_validate:
            try:
                validator.validate(data)
                code = 202
            except jsonschema.ValidationError as e:
                code = 400
                content = json.dumps({'status': 'error', 'message': str(e)})
        response = Response(status=code)
        response.headers.clear()
        response.headers.extend(self.headers)
        response.data = content
        return response(environ, start_response)
export_hdf5.py 文件源码 项目:export2hdf5 作者: bwrc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def validate_config(fname):
    """
    Validate configuration file in json format.
    """
    # Load schema
    schema_fname = pkg_resources.resource_filename('export2hdf5', "config_schema.json")
    schema = load_json_file(schema_fname)
    config = load_json_file(fname)

    res = None
    try:
        jsonschema.validate(config, schema)
    except jsonschema.ValidationError as e:
        res = e.message
    except jsonschema.SchemaError as e:
        res = e

    return res
cme_manager.py 文件源码 项目:chaos-monkey-engine 作者: BBVA 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def execute_plan(self, name, planner_config, attack_config):
        """
        Execute a plan with a planner and executor config to create executors based on the configs

        It also validates the planner and executor config against the modules

        :param name:                Plan name
        :param planner_config:      Dict with planner config
        :param attack_config:       Dict with attack config
        """
        try:
            planner_class = self._planners_store.get(planner_config.get("ref"))
            attack_class = self._attacks_store.get(attack_config.get("ref"))
        except ModuleLookupError as e:
            raise APIError("invalid planner %s" % e.message)

        # Validate both executor and planner configs
        try:
            validate(planner_config.get("args"), planner_class.schema)
            validate(attack_config, attack_class.schema)
        except ValidationError as e:
            raise APIError("invalid payload %s" % e.message)

        planner = planner_class(name)
        planner.plan(planner_config, attack_config)
request_validator.py 文件源码 项目:chaos-monkey-engine 作者: BBVA 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def validate_payload(request, schema):
    """
    validates a request payload against a json schema

    :param request: request received with valid json body
    :param schema:  schema to validate the request payload

    :return: True
    :raises: :meth:`chaosmonkey.api.api_errors`
    """
    try:
        json = request.get_json()
        validate(json, schema)
    except ValidationError as e:
        raise APIError("invalid payload %s" % e.message)
    except Exception:
        raise APIError("payload must be a valid json")
    else:
        return True
tcex_local.py 文件源码 项目:tcex 作者: ThreatConnect-Inc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def validate(install_json):
        """Validate install.json file for required parameters"""

        # install.json validation
        try:
            with open(install_json) as fh:
                data = json.loads(fh.read())
            validate(data, schema)
            print('{} is valid'.format(install_json))
        except SchemaError as e:
            print('{} is invalid "{}"'.format(install_json, e))
        except ValidationError as e:
            print('{} is invalid "{}"'.format(install_json, e))

    # @staticmethod
    # def _wrap(data):
    #     """Wrap any parameters that contain spaces
#
    #     Returns:
    #         (string): String containing parameters wrapped in double quotes
    #     """
    #     if len(re.findall(r'[!\-\s\$]{1,}', data)) > 0:
    #         data = '"{}"'.format(data)
    #     return data
validation.py 文件源码 项目:promenade 作者: att-comdev 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def check_schema(document):
    if type(document) != dict:
        LOG.error('Non-dictionary document passed to schema validation.')
        return

    schema_name = document.get('schema', '<missing>')

    LOG.debug('Validating schema for schema=%s metadata.name=%s', schema_name,
              document.get('metadata', {}).get('name', '<missing>'))

    if schema_name in SCHEMAS:
        try:
            jsonschema.validate(document.get('data'), SCHEMAS[schema_name])
        except jsonschema.ValidationError as e:
            raise exceptions.ValidationException(str(e))
    else:
        LOG.warning('Skipping validation for unknown schema: %s', schema_name)
__init__.py 文件源码 项目:bigchaindb 作者: bigchaindb 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _validate_schema(schema, body):
    """Validate data against a schema"""

    # Note
    #
    # Schema validation is currently the major CPU bottleneck of
    # BigchainDB. the `jsonschema` library validates python data structures
    # directly and produces nice error messages, but validation takes 4+ ms
    # per transaction which is pretty slow. The rapidjson library validates
    # much faster at 1.5ms, however it produces _very_ poor error messages.
    # For this reason we use both, rapidjson as an optimistic pathway and
    # jsonschema as a fallback in case there is a failure, so we can produce
    # a helpful error message.

    try:
        schema[1].validate(rapidjson.dumps(body))
    except ValueError as exc:
        try:
            jsonschema.validate(body, schema[0])
        except jsonschema.ValidationError as exc2:
            raise SchemaValidationError(str(exc2)) from exc2
        logger.warning('code problem: jsonschema did not raise an exception, wheras rapidjson raised %s', exc)
        raise SchemaValidationError(str(exc)) from exc
serializers.py 文件源码 项目:django-api-bouncer 作者: menecio 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def validate(self, data):
        name = data.get('name')
        config = data.get('config', {})

        if not name or name not in plugins:
            raise serializers.ValidationError('Invalid plugin name')

        plugin_schema = plugins[name]

        if self.instance:
            initial_config = self.instance.config
            initial_config.update(config)
            config = initial_config

        try:
            jsonschema.validate(config, plugin_schema)
        except jsonschema.ValidationError as e:
            raise serializers.ValidationError({'config': e})

        plugin_validators = validators.validator_classes.get(name, [])
        for validator in plugin_validators:
            validate = validator(data['config'])
            validate()
        return data
test_cli.py 文件源码 项目:pyblish-starter 作者: pyblish 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_unsuccessful_validation(self):
        error = ValidationError("I am an error!", instance=1)
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator([error]),
                "schema": {},
                "instances": [1],
                "error_format": "{error.instance} - {error.message}",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - I am an error!")
        self.assertEqual(exit_code, 1)
test_cli.py 文件源码 项目:pyblish-starter 作者: pyblish 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_unsuccessful_validation_multiple_instances(self):
        first_errors = [
            ValidationError("9", instance=1),
            ValidationError("8", instance=1),
        ]
        second_errors = [ValidationError("7", instance=2)]
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator(first_errors, second_errors),
                "schema": {},
                "instances": [1, 2],
                "error_format": "{error.instance} - {error.message}\t",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
        self.assertEqual(exit_code, 1)
settings_handler.py 文件源码 项目:jupyterlab_launcher 作者: jupyterlab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def put(self, section_name):
        if not self.settings_dir:
            raise web.HTTPError(404, "No current settings directory")

        raw = self.request.body.strip().decode(u"utf-8")

        # Validate the data against the schema.
        schema = _get_schema(self.schemas_dir, section_name, self.overrides)
        validator = Validator(schema)
        try:
            validator.validate(json.loads(json_minify(raw)))
        except ValidationError as e:
            raise web.HTTPError(400, str(e))

        # Write the raw data (comments included) to a file.
        path = _path(self.settings_dir, section_name, _file_extension, True)
        with open(path, "w") as fid:
            fid.write(raw)

        self.set_status(204)
test_cli.py 文件源码 项目:splunk_ta_ps4_f1_2016 作者: jonathanvarley 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_unsuccessful_validation(self):
        error = ValidationError("I am an error!", instance=1)
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator([error]),
                "schema": {},
                "instances": [1],
                "error_format": "{error.instance} - {error.message}",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - I am an error!")
        self.assertEqual(exit_code, 1)
test_cli.py 文件源码 项目:splunk_ta_ps4_f1_2016 作者: jonathanvarley 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_unsuccessful_validation_multiple_instances(self):
        first_errors = [
            ValidationError("9", instance=1),
            ValidationError("8", instance=1),
        ]
        second_errors = [ValidationError("7", instance=2)]
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator(first_errors, second_errors),
                "schema": {},
                "instances": [1, 2],
                "error_format": "{error.instance} - {error.message}\t",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
        self.assertEqual(exit_code, 1)
test_validators.py 文件源码 项目:splunk_ta_ps4_f1_2016 作者: jonathanvarley 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_it_delegates_to_a_legacy_ref_resolver(self):
        """
        Legacy RefResolvers support only the context manager form of
        resolution.

        """

        class LegacyRefResolver(object):
            @contextmanager
            def resolving(this, ref):
                self.assertEqual(ref, "the ref")
                yield {"type" : "integer"}

        resolver = LegacyRefResolver()
        schema = {"$ref" : "the ref"}

        with self.assertRaises(ValidationError):
            self.validator_class(schema, resolver=resolver).validate(None)
lcpl_test_suite.py 文件源码 项目:lcp-testing-tools 作者: edrlab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_validate_lcpl(self):
        """
        Validate an LCP license file
        """

        # validate the license using the JSON schema, includes:
        #   check the profile Value (basic or 1.0)
        #   check the encryption method (aes-cbc), user key (sha256) and signature algorithm (ecdsa-sha256)

        lcpl_json_schema_path = os.path.join(
            JSON_SCHEMA_DIR_PATH, 'lcpl_schema.json')

        with open(lcpl_json_schema_path) as schema_file:
            lcpl_json_schema = json.loads(schema_file.read())
            try:
                jsonschema.validate(self.lcpl, lcpl_json_schema)
            except jsonschema.ValidationError as err:
                raise TestSuiteRunningError(err)
validator.py 文件源码 项目:ingest-client 作者: jhuapl-boss 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def validate_schema(self):
        """
        Method to validate the JSON data against the schema

        Args:

        Returns:
            str: An error message if schema validation fails

        """
        if not self.schema:
            raise ValueError("Schema has not been populated yet. Cannot validate.")

        try:
            jsonschema.validate(self.config, self.schema)
        except jsonschema.ValidationError as e:
            return e

        return None
test_cli.py 文件源码 项目:TA-SyncKVStore 作者: georgestarcher 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_unsuccessful_validation(self):
        error = ValidationError("I am an error!", instance=1)
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator([error]),
                "schema": {},
                "instances": [1],
                "error_format": "{error.instance} - {error.message}",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - I am an error!")
        self.assertEqual(exit_code, 1)
test_cli.py 文件源码 项目:TA-SyncKVStore 作者: georgestarcher 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_unsuccessful_validation_multiple_instances(self):
        first_errors = [
            ValidationError("9", instance=1),
            ValidationError("8", instance=1),
        ]
        second_errors = [ValidationError("7", instance=2)]
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator(first_errors, second_errors),
                "schema": {},
                "instances": [1, 2],
                "error_format": "{error.instance} - {error.message}\t",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
        self.assertEqual(exit_code, 1)
test_validators.py 文件源码 项目:TA-SyncKVStore 作者: georgestarcher 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_it_delegates_to_a_legacy_ref_resolver(self):
        """
        Legacy RefResolvers support only the context manager form of
        resolution.

        """

        class LegacyRefResolver(object):
            @contextmanager
            def resolving(this, ref):
                self.assertEqual(ref, "the ref")
                yield {"type" : "integer"}

        resolver = LegacyRefResolver()
        schema = {"$ref" : "the ref"}

        with self.assertRaises(ValidationError):
            self.validator_class(schema, resolver=resolver).validate(None)
test_cli.py 文件源码 项目:cb-defense-splunk-app 作者: carbonblack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_unsuccessful_validation(self):
        error = ValidationError("I am an error!", instance=1)
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator([error]),
                "schema": {},
                "instances": [1],
                "error_format": "{error.instance} - {error.message}",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - I am an error!")
        self.assertEqual(exit_code, 1)
test_cli.py 文件源码 项目:cb-defense-splunk-app 作者: carbonblack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_unsuccessful_validation_multiple_instances(self):
        first_errors = [
            ValidationError("9", instance=1),
            ValidationError("8", instance=1),
        ]
        second_errors = [ValidationError("7", instance=2)]
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator(first_errors, second_errors),
                "schema": {},
                "instances": [1, 2],
                "error_format": "{error.instance} - {error.message}\t",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
        self.assertEqual(exit_code, 1)
test_validators.py 文件源码 项目:cb-defense-splunk-app 作者: carbonblack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_it_delegates_to_a_legacy_ref_resolver(self):
        """
        Legacy RefResolvers support only the context manager form of
        resolution.

        """

        class LegacyRefResolver(object):
            @contextmanager
            def resolving(this, ref):
                self.assertEqual(ref, "the ref")
                yield {"type" : "integer"}

        resolver = LegacyRefResolver()
        schema = {"$ref" : "the ref"}

        with self.assertRaises(ValidationError):
            self.validator_class(schema, resolver=resolver).validate(None)


问题


面经


文章

微信
公众号

扫码关注公众号