def validate(evt, evttype):
"""Validates the content of an event against the JSON schema of its type.
Args:
evt (:obj:`baroque.entities.event.Event`): the event to be validated
evttype (:obj:`baroque.entities.eventtype.EventType`): the type of
the event that needs to be validated
Returns:
``True`` if validation is OK, ``False`` otherwise
"""
try:
assert evt.type == evttype
check(loads(evt.json()), loads(evttype.jsonschema))
return True
except (AssertionError, ValidationError):
return False
python类ValidationError()的实例源码
def consume(self, value):
if isinstance(value, list):
value = {item["id"]: item["value"] for item in value}
try:
self.validator.validate(value)
except jsonschema.ValidationError as exc:
LOG.warning("Cannot validate hints: %s", exc)
raise ValueError("Cannot validate hints") from exc
values = {}
for key, schema_value in self.schema.items():
default_value = schema_value.get("default_value", None)
values[key] = value.get(key, default_value)
return values
def generate_swagger(self):
"""
conversion of the raml info into swagger
:return:
"""
try:
parse_tree = ramlparser.load(self.inputname)
except ValidationError as e:
print ('validation error:', e.errors)
print ("could not load file: error loading file")
traceback.print_exc()
return
title = self.get_first_display_name(parse_tree)
version = parse_tree.version
self.swag_openfile(version, title)
self.swag_add_resource(parse_tree)
self.swag_add_generic_parameters(parse_tree)
self.swag_add_definitions(parse_tree)
self.swag_closefile()
print ("swagger document saved..", self.swagger)
self.swag_verify()
def check_schema(body, schema):
"""Ensure all necessary keys are present and correct in create body.
Check that the user-specified create body is in the expected format and
include the required information.
:param body: create body
:raises InvalidParameterValue: if validation of create body fails.
"""
validator = jsonschema.Draft4Validator(
schema, format_checker=jsonschema.FormatChecker())
try:
validator.validate(body)
except jsonschema.ValidationError as exc:
raise exception.InvalidParameterValue(_('Invalid create body: %s') %
exc)
def validate(schema):
def decorator(func):
def wrapper(self, req, resp, *args, **kwargs):
try:
raw_json = req.stream.read()
obj = json.loads(raw_json.decode('utf-8'))
except Exception:
raise falcon.HTTPBadRequest(
'Invalid data',
'Could not properly parse the provided data as JSON'
)
try:
jsonschema.validate(obj, schema)
except jsonschema.ValidationError as e:
raise falcon.HTTPBadRequest(
'Failed data validation',
e.message
)
return func(self, req, resp, *args, parsed=obj, **kwargs)
return wrapper
return decorator
def test_unsuccessful_validation(self):
error = ValidationError("I am an error!", instance=1)
stdout, stderr = StringIO(), StringIO()
exit_code = cli.run(
{
"validator": fake_validator([error]),
"schema": {},
"instances": [1],
"error_format": "{error.instance} - {error.message}",
},
stdout=stdout,
stderr=stderr,
)
self.assertFalse(stdout.getvalue())
self.assertEqual(stderr.getvalue(), "1 - I am an error!")
self.assertEqual(exit_code, 1)
def test_unsuccessful_validation_multiple_instances(self):
first_errors = [
ValidationError("9", instance=1),
ValidationError("8", instance=1),
]
second_errors = [ValidationError("7", instance=2)]
stdout, stderr = StringIO(), StringIO()
exit_code = cli.run(
{
"validator": fake_validator(first_errors, second_errors),
"schema": {},
"instances": [1, 2],
"error_format": "{error.instance} - {error.message}\t",
},
stdout=stdout,
stderr=stderr,
)
self.assertFalse(stdout.getvalue())
self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
self.assertEqual(exit_code, 1)
def validate(self, descriptor, schema_id):
"""
Validate a descriptor against a schema template
:param descriptor:
:param schema_id:
:return:
"""
try:
jsonschema.validate(descriptor, self.load_schema(schema_id))
return True
except ValidationError as e:
log.error("Failed to validate Descriptor against schema '{}'"
.format(schema_id))
self.error_msg = e.message
log.error(e.message)
return
except SchemaError as e:
log.error("Invalid Schema '{}'".format(schema_id))
self.error_msg = e.message
log.debug(e)
return
def resolve_dependencies(args):
"""Main function to handle the dependencies from the config file.
The dependencies will be downloaded from the specified S3 repository
and will be placed in the location provided for the specific
dependency from the configuration file.
:param args: The user's arguments supplied from the main function.
"""
try:
config_file = args.config
with open(config_file, 'r') as json_file:
dependencies_data = json.load(json_file)
validate_schema(dependencies_data)
validate_data(dependencies_data)
download_dependencies(dependencies_data)
sys.exit()
except IOError:
handle_exception('File could not be opened')
except ValueError:
handle_exception('File could not be parsed')
except ValidationError:
handle_exception('File could not be validated')
except Exception:
handle_exception('Unknown error occurred')
def is_valid(self, data: Dict, schema_name: str) -> bool:
"""Validates data with JSON schemes and returns result.
Args:
data: The data.
schema_name: The name of the schemes used for validation.
Returns:
True if data is valid, False if not.
"""
if not self.has_schema(schema_name):
self.logger.warning('JSON schemes "{}" not found'
.format(schema_name))
return False
try:
schema = self._schema.get(schema_name)
jsonschema.validate(data, schema)
except jsonschema.ValidationError:
return False
return True
def custom_check_card(self, card):
validations = []
#check foreing codes
for collection in ["affiliation", "faction", "rarity", "type", "subtype"]:
field = collection + "_code"
if field in card and not card.get(field) in self.collections[collection]:
validations.append("%s code '%s' does not exists in card '%s'" % (collection, card.get(field), card.get('code')))
#check reprint of
if 'reprint_of' in card and not card.get('reprint_of') in self.collections['card']:
validations.append("Reprinted card %s does not exists" % (card.get('reprint_of')))
#checks by type
check_by_type_method = "custom_check_%s_card" % card.get('type_code')
if hasattr(self, check_by_type_method) and callable(getattr(self, check_by_type_method)):
validations.extend(getattr(self, check_by_type_method)(card))
if validations:
raise jsonschema.ValidationError("\n".join(["- %s" % v for v in validations]))
def test_unsuccessful_validation(self):
error = ValidationError("I am an error!", instance=1)
stdout, stderr = StringIO(), StringIO()
exit_code = cli.run(
{
"validator": fake_validator([error]),
"schema": {},
"instances": [1],
"error_format": "{error.instance} - {error.message}",
},
stdout=stdout,
stderr=stderr,
)
self.assertFalse(stdout.getvalue())
self.assertEqual(stderr.getvalue(), "1 - I am an error!")
self.assertEqual(exit_code, 1)
def test_unsuccessful_validation_multiple_instances(self):
first_errors = [
ValidationError("9", instance=1),
ValidationError("8", instance=1),
]
second_errors = [ValidationError("7", instance=2)]
stdout, stderr = StringIO(), StringIO()
exit_code = cli.run(
{
"validator": fake_validator(first_errors, second_errors),
"schema": {},
"instances": [1, 2],
"error_format": "{error.instance} - {error.message}\t",
},
stdout=stdout,
stderr=stderr,
)
self.assertFalse(stdout.getvalue())
self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
self.assertEqual(exit_code, 1)
def validate(self, *args, **kwargs):
try:
self.validator.validate(*args, **kwargs)
except jsonschema.ValidationError as ex:
if isinstance(ex.cause, exception.InvalidName):
detail = ex.cause.format_message()
elif len(ex.path) > 0:
detail = _("Invalid input for field/attribute %(path)s."
" Value: %(value)s. %(message)s") % {
'path': ex.path.pop(), 'value': ex.instance,
'message': ex.message
}
else:
detail = ex.message
raise exception.ValidationError(detail=detail)
except TypeError as ex:
# NOTE: If passing non string value to patternProperties parameter,
# TypeError happens. Here is for catching the TypeError.
detail = six.text_type(ex)
raise exception.ValidationError(detail=detail)
def test_validate_raises_if_invalid_data():
data = {
'foo': 'bar',
}
schema = {
'$schema': 'http://json-schema.org/schema#',
'type': 'object',
'properties': {
'foo': {
'type': 'integer'
}
}
}
with pytest.raises(ValidationError):
utils.validate(data, schema)
def _validate_logic(self, doc):
doc_name = next(iter(doc))
# ????????, ??? ???? ???? ?? ?????? ????????? ????
doc_timestamp = doc[doc_name].get('dateTime')
if self.min_date and self.min_date.timestamp() > doc_timestamp:
doc_date = datetime.datetime.fromtimestamp(doc_timestamp)
raise ValidationError('Document timestamp ' + str(doc_date) + ' is less than min. allowed date ' +
str(self.min_date))
# ????????, ??? ??? ????? ???? "?? ????????" ?????? ?? 24 ???? ?????? UTC
future = datetime.datetime.utcnow() + datetime.timedelta(hours=self.future_hours)
if doc_timestamp > future.timestamp():
doc_date = datetime.datetime.fromtimestamp(doc_timestamp)
raise ValidationError('Document timestamp {} is greater than now for {} hours'
.format(str(doc_date), str(self.future_hours)))
def _build_object(cls, value, schema, nested_types, input_):
if 'object' in nested_types:
raise ModelBaseError('nested object was not allowed', input_=input_)
properties = value.split('|')
dict_obj = dict()
nested_types.add('object')
for prop in properties:
key, value = prop.split(':')
prop_schema = schema['properties'].get(key)
if prop_schema is None:
raise ValidationError("Invalid property '{}'".format(key),
instance=input_, schema=schema)
dict_obj[key] = \
cls._build_value(value, prop_schema, nested_types, input_)
nested_types.discard('object')
return dict_obj
def put_by_uri_template(cls, req, resp):
session, req_body, id_, kwargs = cls._get_context_values(req.context)
req_body_copy = deepcopy(req_body)
cls._update_dict(req_body, id_)
objs = cls.update(session, req_body, ids=id_, **kwargs)
if not objs:
req_body = req_body_copy
ambigous_keys = [
kwa for kwa in id_ if kwa in req_body and req_body[kwa] != id_[kwa]]
if ambigous_keys:
body_schema = req.context.get('body_schema')
raise ValidationError(
"Ambiguous value for '{}'".format(
"', '".join(ambigous_keys)),
instance={'body': req_body, 'uri': id_}, schema=body_schema)
req.context['parameters']['body'] = req_body
cls._insert(req, resp, with_update=True)
else:
resp.body = json.dumps(objs[0])
def __prepare_json(status, op=None, schema=None, data=None, errors=None):
"""Prepare json structure for returning."""
ret_json = {"status": status}
if errors:
if not isinstance(errors, list):
ret_json["errors"] = [errors]
else:
ret_json["errors"] = errors
if data:
ret_json["data"] = data
if op:
op_schema = _get_pkg_output_schema(op)
try:
jsonschema.validate(ret_json, op_schema)
except jsonschema.ValidationError as e:
newret_json = {"status": EXIT_OOPS,
"errors": [{"reason": str(e)}]}
return newret_json
if schema:
ret_json["schema"] = schema
return ret_json
validators.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 42
收藏 0
点赞 0
评论 0
def validate(self, *args, **kwargs):
try:
self.validator.validate(*args, **kwargs)
except jsonschema.ValidationError as ex:
if isinstance(ex.cause, exception.InvalidName):
detail = ex.cause.format_message()
elif len(ex.path) > 0:
# NOTE: For whole OpenStack message consistency, this error
# message has been written as the similar format of WSME.
detail = _("Invalid input for field/attribute %(path)s."
" Value: %(value)s. %(message)s") % {
'path': ex.path.pop(), 'value': ex.instance,
'message': ex.message
}
else:
detail = ex.message
raise exception.ValidationError(detail=detail)
except TypeError as ex:
# NOTE: If passing non string value to patternProperties parameter,
# TypeError happens. Here is for catching the TypeError.
detail = six.text_type(ex)
raise exception.ValidationError(detail=detail)