def validate(self, task, method, http_method, **kwargs):
"""Validates the vendor method's parameters.
This method validates whether the supplied data contains the required
information for the driver.
:param task: a TaskManager instance.
:param method: name of vendor method.
:param http_method: HTTP method.
:param kwargs: data passed to vendor's method.
:raises: InvalidParameterValue if supplied data is not valid.
:raises: MissingParameterValue if parameters missing in supplied data.
"""
try:
if 'statistics' in method:
self._validate_statistics_methods(method, **kwargs)
else:
self._validate_policy_methods(method, **kwargs)
except json_schema_exc.ValidationError as e:
raise exception.InvalidParameterValue(_('Input data validation '
'error: %s') % e)
python类validate()的实例源码
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
with open(filename, 'rb') as json_file:
return json.loads(json_file.read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def validate_analysis_result(context, ecosystem, package, version):
"""Validate results of the analysis."""
res = context.response.json()
# make sure analysis has finished
assert res['finished_at'] is not None
# we want to validate top-level analysis and worker results that have "schema" defined
structures_to_validate = [res]
for _, worker_result in res['analyses'].items():
# TODO: in future we want to mandate that all workers have their schemas,
# so we'll remove the condition
if 'schema' in worker_result:
structures_to_validate.append(worker_result)
for struct in structures_to_validate:
schema = requests.get(struct['schema']['url']).json()
jsonschema.validate(struct, schema)
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def __init__(self, path=None):
self.path = path or os.getenv('WEBHDFS_CONFIG', self.default_path)
if osp.exists(self.path):
try:
self.config = json.loads(open(self.path).read())
self.schema = json.loads(resource_string(__name__, 'resources/config_schema.json'))
#self.schema = open("resources/schema.config").read()
try:
js.validate(self.config, self.schema)
except js.ValidationError as e:
print e.message
except js.SchemaError as e:
print e
except ParsingError:
raise HdfsError('Invalid configuration file %r.', self.path)
_logger.info('Instantiated configuration from %r.', self.path)
else:
raise HdfsError('Invalid configuration file %r.', self.path)
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
with open(filename, 'rb') as json_file:
return json.loads(json_file.read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def _example_api(request, schema, example):
response = None
if schema:
# If there is a problem with the json data, return a 400.
try:
data = json.loads(request.body.decode("utf-8"))
validate(data, schema)
except Exception as e:
if hasattr(settings, 'RAMLWRAP_VALIDATION_ERROR_HANDLER') and settings.RAMLWRAP_VALIDATION_ERROR_HANDLER:
response = _call_custom_handler(e)
else:
response = _validation_error_handler(e)
if response:
return response
if not example:
return None
else:
return example
def _is_valid_query(params, expected_params):
"""
Function to validate get request params.
"""
# If expected params, check them. If not, pass.
if expected_params:
for param in expected_params:
# If the expected param is in the query.
if param in params:
for check, rule in expected_params[param].__dict__.items():
if rule is not None:
error_message = "QueryParam [%s] failed validation check [%s]:[%s]" % (param, check, rule)
if check == "minLength":
if len(params.get(param)) < rule:
raise ValidationError(error_message)
elif check == "maxLength":
if len(params.get(param)) > rule:
raise ValidationError(error_message)
# Isn't in the query but it is required, throw a validation exception.
elif expected_params[param].required is True:
raise ValidationError("QueryParam [%s] failed validation check [Required]:[True]" % param)
# TODO Add more checks here.
return True
test_aiohttp_swaggerify.py 文件源码
项目:aiohttp_swaggerify
作者: dchaplinsky
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_validate_fullfledged(test_client, loop):
app = web.Application(loop=loop)
app.router.add_get('/', full_fledged_handler)
app.router.add_post('/', full_fledged_handler)
app = swaggerify(
app,
basePath="/",
host="127.0.0.1:8080"
)
client = await test_client(app)
resp = await client.get('/swagger.json')
assert resp.status == 200
text = await resp.json()
with open("tests/validate_swagger.json", "r") as fp:
assert schema_validate(text, json.load(fp)) is None
def validate_input(self, formData):
schema = {
'type': 'string',
'pattern': '^#([A-Fa-f0-9]{6})$'
}
validate(formData, schema)
# TODO: this is for rgb triple
# def validate_input(self, formData):
# schema = {
# 'type': 'array',
# 'items': {
# 'type': 'number',
# 'minimum': 0,
# 'maximum': 1
# },
# 'minItems': 3,
# 'maxItems': 3
# }
# validate(formData, schema)
def get_schemas(cls, doc):
"""Retrieve the relevant schema based on the document's ``schema``.
:param dict doc: The document used for finding the correct schema
to validate it based on its ``schema``.
:returns: A schema to be used by ``jsonschema`` for document
validation.
:rtype: dict
"""
cls._register_data_schemas()
# FIXME(fmontei): Remove this once all Deckhand tests have been
# refactored to account for dynamic schema registeration via
# ``DataSchema`` documents. Otherwise most tests will fail.
for doc_field in [doc['schema'], doc['metadata']['schema']]:
matching_schemas = cls._get_schema_by_property(
cls.schema_re, doc_field)
if matching_schemas:
return matching_schemas
return []
def check_schemas(data_root, schemas_dir, verbose=False):
schemas = ('category.json', 'video.json')
all_file_paths = get_json_files(data_root)
error_count = 0
for schema, file_paths in zip(schemas, all_file_paths):
schema_path = os.path.join(schemas_dir, schema)
with open(schema_path, encoding='UTF-8') as fp:
schema_blob = json.load(fp)
for file_path in file_paths:
with open(file_path, encoding='UTF-8') as fp:
blob = json.load(fp)
try:
jsonschema.validate(blob, schema_blob)
except jsonschema.exceptions.ValidationError as e:
print(file_path, flush=True)
if verbose:
print(e, flush=True)
error_count += 1
return error_count
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
with open(filename, 'rb') as json_file:
return json.loads(json_file.read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def _test_validate(self, schema, expect_failure, input_files, input):
"""validates input yaml against schema.
:param schema: schema yaml file
:param expect_failure: should the validation pass or fail.
:param input_files: pytest fixture used to access the test input files
:param input: test input yaml doc filename"""
schema_dir = pkg_resources.resource_filename('drydock_provisioner',
'schemas')
schema_filename = os.path.join(schema_dir, schema)
schema_file = open(schema_filename, 'r')
schema = yaml.safe_load(schema_file)
input_file = input_files.join(input)
instance_file = open(str(input_file), 'r')
instance = yaml.safe_load(instance_file)
if expect_failure:
with pytest.raises(ValidationError):
jsonschema.validate(instance['spec'], schema['data'])
else:
jsonschema.validate(instance['spec'], schema['data'])
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def validate_json_schema(data, schema, name="task"):
"""Given data and a jsonschema, let's validate it.
This happens for tasks and chain of trust artifacts.
Args:
data (dict): the json to validate.
schema (dict): the jsonschema to validate against.
name (str, optional): the name of the json, for exception messages.
Defaults to "task".
Raises:
ScriptWorkerTaskException: on failure
"""
try:
jsonschema.validate(data, schema)
except jsonschema.exceptions.ValidationError as exc:
raise ScriptWorkerTaskException(
"Can't validate {} schema!\n{}".format(name, str(exc)),
exit_code=STATUSES['malformed-payload']
)
def validate_service_definitions(components_map, components=None):
if not components:
components = components_map.keys()
else:
validation_base.validate_components_names(components, components_map)
not_passed_components = set()
for component in components:
try:
jsonschema.validate(components_map[component]["service_content"],
SERVICE_SCHEMA,
format_checker=ServiceFormatChecker())
except jsonschema.ValidationError as e:
LOG.error("Validation of service definitions for component '%s' "
"is not passed: '%s'", component, e.message)
not_passed_components.add(component)
if not_passed_components:
raise RuntimeError(
"Validation of service definitions for {} of {} components is "
"not passed.".format(len(not_passed_components), len(components))
)
else:
LOG.info("Service definitions validation passed successfully")
def get_config_schema():
schema = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'additionalProperties': False,
'properties': {
'debug': {'type': 'boolean'},
'verbose_level': {'type': 'integer'},
'log_file': {'anyOf': [{'type': 'null'}, {'type': 'string'}]},
'default_log_levels': {'type': 'array',
'items': {'type': 'string'}}
},
}
for module in CONFIG_MODULES:
schema['properties'].update(module.SCHEMA)
# Don't validate all options used to be added from oslo.log and oslo.config
ignore_opts = ['debug', 'verbose', 'log_file']
for name in ignore_opts:
schema['properties'][name] = {}
# Also for now don't validate sections that used to be in deploy config
for name in ['configs', 'secret_configs', 'nodes', 'roles', 'versions']:
schema['properties'][name] = {'type': 'object'}
return schema
def removeImages(self, imgList):
"""
Attempt to remove image metadata from the mongo database
:param imgList: a list of docker image names
:type imgList: a list of strings
"""
try:
for img in imgList:
hash = DockerImage.getHashKey(img)
imageData = self._load(hash)
super(DockerImageModel, self).remove(imageData.getRawData())
except Exception as err:
if isinstance(err, DockerImageNotFoundError):
logger.exception('Image %r does not exist', img)
raise DockerImageNotFoundError(
'The image %s with hash %s does not exist '
'in the database' % (img, hash), img)
else:
logger.exception('Could not remove image %r', img)
raise DockerImageError(
'Could not delete the image data from the database '
'invalid image: ' + img + ' ' + str(err), img)
# TODO validate the xml of each cli