def require_schema(schema):
"""This decorator verifies that request JSON matches given JSONSchema.
http://json-schema.org
"""
validator = jsonschema.Draft4Validator(
schema,
format_checker=jsonschema.FormatChecker()
)
def outer_decorator(func):
@functools.wraps(func)
def inner_decorator(self, *args, **kwargs):
errors = validator.iter_errors(self.request_json)
errors = [err.message for err in errors]
if errors:
LOG.warning("Cannot validate request: %s", errors)
raise exceptions.InvalidJSONError(errors)
return func(self, *args, **kwargs)
return inner_decorator
return outer_decorator
python类FormatChecker()的实例源码
def check_schema(body, schema):
"""Ensure all necessary keys are present and correct in create body.
Check that the user-specified create body is in the expected format and
include the required information.
:param body: create body
:raises InvalidParameterValue: if validation of create body fails.
"""
validator = jsonschema.Draft4Validator(
schema, format_checker=jsonschema.FormatChecker())
try:
validator.validate(body)
except jsonschema.ValidationError as exc:
raise exception.InvalidParameterValue(_('Invalid create body: %s') %
exc)
def validate_schema(payload, schema):
"""Validates the payload against a
defined json schema for the requested
endpoint.
:param payload: incoming request data
:type payload: dict
:param schema: the schema the request payload should
be validated against
:type schema: .json file
:returns: errors if any
:rtype: list
"""
errors = []
validator = jsonschema.Draft4Validator(schema,
format_checker=jsonschema.FormatChecker())
for error in sorted(validator.iter_errors(payload), key=str):
errors.append(error.message)
return errors
def add(self, message):
if isinstance(message, singer.RecordMessage):
stream = self.ensure_stream(message.stream)
if stream.latest_schema:
validator_fn = extend_with_default(Draft4Validator)
validator = validator_fn(
stream.latest_schema, format_checker=FormatChecker())
validator.validate(copy.deepcopy(message.record))
else:
print('I saw a record for stream {} before the schema'.format(
message.stream))
exit(1)
stream.num_records += 1
elif isinstance(message, singer.SchemaMessage):
stream = self.ensure_stream(message.stream)
stream.num_schemas += 1
stream.latest_schema = message.schema
elif isinstance(message, singer.StateMessage):
self.latest_state = message.value
self.num_states += 1
def is_valid_response(response, schema):
"""
Returns True if the response validates with the schema, False otherwise
:param response: The response returned by the API
:param schema: The schema to validate with
:returns: True if the response validates with the schema, False otherwise
"""
try:
validate(response, schema, format_checker=FormatChecker())
return True
except Exception as e:
log.warning("Response is not valid: %s" % (e,))
return False
def validate(instance, schema, cls=None, *args, **kwargs):
"""
Calls jsonschema.validate() with the arguments.
"""
format_checker = FormatChecker()
_validate(instance, schema, cls, *args, format_checker=format_checker, **kwargs)
def __init__(self, schema, is_body=True):
self.is_body = is_body
validators = {
'minimum': self._validate_minimum,
'maximum': self._validate_maximum
}
validator_cls = jsonschema.validators.extend(self.validator_org,
validators)
fc = jsonschema.FormatChecker()
self.validator = validator_cls(schema, format_checker=fc)
def validate_json(json_data,
json_schema=None,
json_example=None,
validator_cls=None,
format_checker=jsonschema.FormatChecker(),
on_empty_404=False):
# if output is empty, auto return the error 404.
if not json_data and on_empty_404:
raise JsonError(404, "Not found.")
if json_schema is not None:
json_data = _to_json(json_data)
# We wrap output in an object before validating in case
# output is a string (and ergo not a validatable JSON
# object)
jsonschema.validate(
{
"result": json_data
},
{
"type": "object",
"properties": {
"result": json_schema
},
"required": ["result"]
},
cls=validator_cls,
format_checker=format_checker
)
return json_data
def __init__(self, name):
self.name = name
self.schema = schemas.SCHEMAS.get(name)
checker = jsonschema.FormatChecker()
self.validator = jsonschema.Draft4Validator(self.schema,
format_checker=checker)
def _validate(data, schema):
v = Draft4Validator(schema, format_checker=FormatChecker())
errors = sorted(v.iter_errors(data), key=lambda e: e.path)
return errors
def validate(self, json, schema_name):
"""
????
:param json: type(str) ??json
:param schema_name: type(str) schema??
:return: (boolean, dict)
???????? (True, );??????? (False, err)
err = {
"key_name":"",
"key_path":"",
"msg": ""
}
:type json: dict
:type schema_name: str
:rtype (bool, str)
"""
err = {
"key_name": "",
"key_path": "",
"msg": ""
}
try:
jsonschema.validate(json, self.schema[schema_name], format_checker=jsonschema.FormatChecker())
return True, err
except ValidationError, e:
key_name = ""
if e.path:
key_name = e.path.pop().replace(".", "")
key_path = str(list(e.path)).replace("[", "").replace("]", "").replace(", ", ".").replace("'", "")
msg = e.message
err['key_name'] = key_name
err['key_path'] = key_path
err['msg'] = msg
return False, err
except SchemaError:
raise SchemaError("invalid schema in validator[%s], please checking" % schema_name)
except KeyError:
raise ValueError("not found schema name [%s] in schema config" % schema_name)
def validate(payload, schema):
"""Validate `payload` against `schema`, returning an error list.
jsonschema provides lots of information in it's errors, but it can be a bit
of work to extract all the information.
"""
v = jsonschema.Draft4Validator(
schema, format_checker=jsonschema.FormatChecker())
error_list = []
for error in v.iter_errors(payload):
message = error.message
location = '/' + '/'.join([str(c) for c in error.absolute_path])
error_list.append(message + ' at ' + location)
return error_list
def __init__(self, schema, relax_additional_properties=False):
validators = {
'minimum': self._validate_minimum,
'maximum': self._validate_maximum,
}
if relax_additional_properties:
validators[
'additionalProperties'] = _soft_validate_additional_properties
validator_cls = jsonschema.validators.extend(self.validator_org,
validators)
format_checker = FormatChecker()
self.validator = validator_cls(schema, format_checker=format_checker)
validators.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def __init__(self, schema, relax_additional_properties=False):
validators = {
'minimum': self._validate_minimum,
'maximum': self._validate_maximum,
}
if relax_additional_properties:
validators[
'additionalProperties'] = _soft_validate_additional_properties
validator_cls = jsonschema.validators.extend(self.validator_org,
validators)
format_checker = FormatChecker()
self.validator = validator_cls(schema, format_checker=format_checker)
def handle_batch(self, messages, schema, key_names, bookmark_names=None): # pylint: disable=no-self-use,unused-argument
'''Handles messages by validating them against schema.'''
schema = float_to_decimal(schema)
validator = Draft4Validator(schema, format_checker=FormatChecker())
for i, message in enumerate(messages):
if isinstance(message, singer.RecordMessage):
data = float_to_decimal(message.record)
validator.validate(data)
if key_names:
for k in key_names:
if k not in data:
raise TargetStitchException(
'Message {} is missing key property {}'.format(
i, k))
LOGGER.info('Batch is valid')
def test_fetch_license(self):
""" Fetch a license from the URL found in the status doc, then validates it """
try:
license = next((l for l in self.lsd['links'] if l['rel'] == 'license'))
except StopIteration as err:
raise TestSuiteRunningError(
"Missing a 'license' link in the status document")
license_url = license['href']
LOGGER.debug("Fetch license at url %s:", license_url)
# fetch the license
r = requests.get(license_url)
try:
self.lcpl = r.json()
except ValueError as err:
LOGGER.debug(r.text)
raise TestSuiteRunningError("Malformed JSON License Document")
LOGGER.debug("The License is available")
# validate the license
lcpl_json_schema_path = os.path.join(
JSON_SCHEMA_DIR_PATH, 'lcpl_schema.json')
with open(lcpl_json_schema_path) as schema_file:
lcpl_json_schema = json.loads(schema_file.read())
try:
jsonschema.validate(self.lcpl, lcpl_json_schema, format_checker=jsonschema.FormatChecker())
except jsonschema.ValidationError as err:
raise TestSuiteRunningError(err)
LOGGER.debug("The up to date License is available and valid")
# display some license values
LOGGER.info("date issued {}, updated {}".format(
self.lcpl['issued'], self.lcpl['updated'] if "updated" in self.lcpl else "never"))
LOGGER.info("rights print {}, copy {}".format(
self.lcpl['rights']['print'], self.lcpl['rights']['copy']))
LOGGER.info("rights start {}, end {}".format(
self.lcpl['rights']['start'] if "start" in self.lcpl['rights'] else "none",
self.lcpl['rights']['end'] if "end" in self.lcpl['rights'] else "none"))