def validate(schema, data):
"""
A wrapper around the call to voluptuous schema to raise the proper exception.
Args:
schema: The voluptuous Schema object
data: The validation data for the schema object
Raises:
APIException with status 0 and the voluptuous error message
"""
try:
schema(data)
except MultipleInvalid as error:
raise APIException(0, None, error.msg)
python类MultipleInvalid()的实例源码
def get_problem(problem_path):
"""
Retrieve a problem spec from a given problem directory.
Args:
problem_path: path to the root of the problem directory.
Returns:
A problem object.
"""
json_path = join(problem_path, "problem.json")
problem = json.loads(open(json_path, "r").read())
try:
problem_schema(problem)
except MultipleInvalid as e:
logger.critical("Error validating problem object at '%s'!", json_path)
logger.critical(e)
raise FatalException
return problem
def validate(schema, data):
"""
A wrapper around the call to voluptuous schema to raise the proper exception.
Args:
schema: The voluptuous Schema object
data: The validation data for the schema object
Raises:
APIException with status 0 and the voluptuous error message
"""
try:
schema(data)
except MultipleInvalid as error:
raise APIException(0, None, error.msg)
def get_problem(problem_path):
"""
Retrieve a problem spec from a given problem directory.
Args:
problem_path: path to the root of the problem directory.
Returns:
A problem object.
"""
json_path = join(problem_path, "problem.json")
problem = json.loads(open(json_path, "r").read())
try:
problem_schema(problem)
except MultipleInvalid as e:
logger.critical("Error validating problem object at '%s'!", json_path)
logger.critical(e)
raise FatalException
return problem
def get_bundle(bundle_path):
"""
Retrieve a bundle spec from a given bundle directory.
Args:
bundle_path: path to the root of the bundle directory.
Returns:
A bundle object.
"""
json_path = join(bundle_path, "bundle.json")
bundle = json.loads(open(json_path, "r").read())
try:
bundle_schema(bundle)
except MultipleInvalid as e:
logger.critical("Error validating bundle object at '%s'!", json_path)
logger.critical(e)
raise FatalException
return bundle
def validate(schema, data):
"""
A wrapper around the call to voluptuous schema to raise the proper exception.
Args:
schema: The voluptuous Schema object
data: The validation data for the schema object
Raises:
APIException with status 0 and the voluptuous error message
"""
try:
schema(data)
except MultipleInvalid as error:
raise APIException(0, None, error.msg)
def test_entity_ids():
"""Test entity ID validation."""
schema = vol.Schema(cv.entity_ids)
for value in (
'invalid_entity',
'sensor.light,sensor_invalid',
['invalid_entity'],
['sensor.light', 'sensor_invalid'],
['sensor.light,sensor_invalid'],
):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in (
[],
['sensor.light'],
'sensor.light'
):
schema(value)
assert schema('sensor.LIGHT, light.kitchen ') == [
'sensor.light', 'light.kitchen'
]
def test_time_period():
"""Test time_period validation."""
schema = vol.Schema(cv.time_period)
for value in (
None, '', 'hello:world', '12:', '12:34:56:78',
{}, {'wrong_key': -10}
):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in (
'8:20', '23:59', '-8:20', '-23:59:59', '-48:00', {'minutes': 5}, 1, '5'
):
schema(value)
assert timedelta(seconds=180) == schema('180')
assert timedelta(hours=23, minutes=59) == schema('23:59')
assert -1 * timedelta(hours=1, minutes=15) == schema('-1:15')
def valid_adapter_response(base_name, func_name, data):
"""
Valid that given data match described schema
:param str base_name: The name of the asbtract
:param str func_name: The name of the called function
:parma raw data: The data to valid
:raises InvalidFormatError: if data is not compliant
"""
if not data:
return
try:
Schemas[base_name][func_name](data)
except (KeyError, TypeError, ValueError):
raise SchemaNotFound('Schema not found for %s.%s' % (base_name, func_name))
except (Invalid, MultipleInvalid) as ex:
raise InvalidFormatError('Given data is not compliant to %s.%s schema: %s' % (base_name, func_name, str(ex)))
def validate_body(schema_desc):
"""
Validate json body
"""
def real_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
body = request.get_json()
if not Schemas.get(func.__name__):
Schemas[func.__name__] = Schema(schema_desc, required=True)
Logger.debug(unicode('registering schema for %s' % (func.__name__)))
Schemas[func.__name__](body)
except (Invalid, MultipleInvalid) as ex:
Logger.error(unicode(ex))
msg = 'Missing or invalid field(s) in body, expecting {}'.format(schema_desc)
raise BadRequest(msg)
return func(*args, **kwargs)
return wrapper
return real_decorator
def process_cleanup_arch(self):
log.log(log.LOG_INFO, "Processing Cleanup of Architectures")
for arch in self.get_config_section('cleanup-architecture'):
try:
self.validator.cleanup_arch(arch)
except MultipleInvalid as e:
log.log(log.LOG_WARN, "Cannot delete Architecture '{0}': YAML validation Error: {1}".format(arch['name'], e))
continue
try:
self.fm.architectures.show(arch['name'])['id']
log.log(log.LOG_INFO, "Delete Architecture '{0}'".format(arch['name']))
self.fm.architectures.destroy( arch['name'] )
except:
log.log(log.LOG_WARN, "Architecture '{0}' already absent.".format(arch['name']))
def process_cleanup_computeprfl(self):
log.log(log.LOG_INFO, "Processing Cleanup of Compute profiles")
for computeprfl in self.get_config_section('cleanup-compute-profile'):
try:
self.validator.cleanup_computeprfl(computeprfl)
except MultipleInvalid as e:
log.log(log.LOG_WARN, "Cannot delete Compute profile '{0}': YAML validation Error: {1}".format(computeprfl['name'], e))
continue
try:
self.fm.compute_profiles.show(computeprfl['name'])['id']
log.log(log.LOG_INFO, "Delete Compute profile '{0}'".format(computeprfl['name']))
self.fm.compute_profiles.destroy( computeprfl['name'] )
except:
log.log(log.LOG_WARN, "Compute profile '{0}' already absent.".format(computeprfl['name']))
def process_cleanup_medium(self):
log.log(log.LOG_INFO, "Processing Cleanup of Media")
medialist = self.fm.media.index(per_page=99999)['results']
for medium in self.get_config_section('cleanup-medium'):
try:
self.validator.cleanup_medium(medium)
except MultipleInvalid as e:
log.log(log.LOG_WARN, "Cannot delete Medium '{0}': YAML validation Error: {1}".format(medium['name'], e))
continue
medium_deleted = False
# fm.media.show(name) does not work, we need to iterate over fm.media.index()
for mediac in medialist:
if (mediac['name'] == medium['name']):
medium_deleted = True
log.log(log.LOG_INFO, "Delete Medium '{0}'".format(medium['name']))
self.fm.media.destroy( medium['name'] )
continue
if not medium_deleted:
log.log(log.LOG_WARN, "Medium '{0}' already absent.".format(medium['name']))
def process_cleanup_ptable(self):
log.log(log.LOG_INFO, "Processing Cleanup of Partition Tables")
for ptable in self.get_config_section('cleanup-partition-table'):
try:
self.validator.cleanup_ptable(ptable)
except MultipleInvalid as e:
log.log(log.LOG_WARN, "Cannot delete Partition Table '{0}': YAML validation Error: {1}".format(ptable['name'], e))
continue
try:
self.fm.ptables.show(ptable['name'])['id']
log.log(log.LOG_INFO, "Delete Partition Table '{0}'".format(ptable['name']))
self.fm.ptables.destroy( ptable['name'] )
except:
log.log(log.LOG_WARN, "Partition Table '{0}' already absent.".format(ptable['name']))
def process_cleanup_provisioningtpl(self):
log.log(log.LOG_INFO, "Processing Cleanup of Provisioning Templates")
ptlist = self.fm.provisioning_templates.index(per_page=99999)['results']
for pt in self.get_config_section('cleanup-provisioning-template'):
try:
self.validator.cleanup_provt(pt)
except MultipleInvalid as e:
log.log(log.LOG_WARN, "Cannot delete Provisioning Template '{0}': YAML validation Error: {1}".format(pt['name'], e))
continue
# fm.provisioning_templates.show(name) does not work as expected, we need to iterate over fm.provisioning_templates.index()
pt_deleted = False
for ptc in ptlist:
if (ptc['name'] == pt['name']):
pt_deleted = True
log.log(log.LOG_INFO, "Delete Provisioning Template '{0}'".format(pt['name']))
self.fm.provisioning_templates.destroy( pt['name'] )
continue
if not pt_deleted:
log.log(log.LOG_WARN, "Provisioning Template '{0}' already absent.".format(pt['name']))
def process_config_enviroment(self):
log.log(log.LOG_INFO, "Processing Environments")
envlist = self.fm.environments.index(per_page=99999)['results']
for env in self.get_config_section('environment'):
try:
self.validator.enviroment(env)
except MultipleInvalid as e:
log.log(log.LOG_WARN, "Cannot create Environment '{0}': YAML validation Error: {1}".format(env['name'], e))
continue
env_id = False
# fm.media.show(name) does not work, we need to iterate over fm.media.index()
for envc in envlist:
if (env['name'] == envc['name']):
env_id = envc['id']
log.log(log.LOG_DEBUG, "Environment '{0}' (id={1}) already present.".format(env['name'], env_id))
continue
if not env_id:
log.log(log.LOG_INFO, "Create Environment '{0}'".format(env['name']))
self.fm.environments.create( environment = { 'name': env['name'] } )
def process_config_model(self):
log.log(log.LOG_INFO, "Processing Models")
for model in self.get_config_section('model'):
try:
self.validator.model(model)
except MultipleInvalid as e:
log.log(log.LOG_WARN, "Cannot create Model '{0}': YAML validation Error: {1}".format(model['name'], e))
continue
try:
model_id = self.fm.models.show(model['name'])['id']
log.log(log.LOG_DEBUG, "Model '{0}' (id={1}) already present.".format(model['name'], model_id))
except:
log.log(log.LOG_INFO, "Create Model '{0}'".format(model['name']))
model_tpl = {
'name': model['name'],
'info': model['info'],
'vendor_class': model['vendor-class'],
'hardware_model': model['hardware-model']
}
self.fm.models.create( model = model_tpl )
def process_config_medium(self):
log.log(log.LOG_INFO, "Processing Media")
medialist = self.fm.media.index(per_page=99999)['results']
for medium in self.get_config_section('medium'):
try:
self.validator.medium(medium)
except MultipleInvalid as e:
log.log(log.LOG_WARN, "Cannot create Media '{0}': YAML validation Error: {1}".format(medium['name'], e))
continue
medium_id = False
# fm.media.show(name) does not work, we need to iterate over fm.media.index()
for mediac in medialist:
if (mediac['name'] == medium['name']):
medium_id = mediac['id']
log.log(log.LOG_DEBUG, "Medium '{0}' (id={1}) already present.".format(medium['name'], medium_id))
if not medium_id:
log.log(log.LOG_INFO, "Create Medium '{0}'".format(medium['name']))
medium_tpl = {
'name': medium['name'],
'path': medium['path'],
'os_family': medium['os-family']
}
self.fm.media.create( medium = medium_tpl )
def process_config_settings(self):
log.log(log.LOG_INFO, "Processing Foreman Settings")
for setting in self.get_config_section('setting'):
try:
self.validator.setting(setting)
except MultipleInvalid as e:
log.log(log.LOG_WARN, "Cannot update Setting '{0}': YAML validation Error: {1}".format(setting['name'], e))
continue
setting_id = False
try:
setting_id = self.fm.settings.show(setting['name'])['id']
except:
log.log(log.LOG_WARN, "Cannot get ID of Setting '{0}', skipping".format(setting['name']))
setting_tpl = {
'value': setting['value']
}
if setting_id:
log.log(log.LOG_INFO, "Update Setting '{0}'".format(setting['name']))
self.fm.settings.update(setting_tpl, setting_id)
def process_auth_sources_ldap(self):
log.log(log.LOG_INFO, "Processing LDAP auth sources")
for auth in self.get_config_section('auth-source-ldap'):
# validate yaml
try:
self.validator.auth_source_ldaps(auth)
except MultipleInvalid as e:
log.log(log.LOG_WARN, "Cannot create LDAP source '{0}': YAML validation Error: {1}".format(auth['name'], e))
continue
try:
as_id = self.fm.auth_source_ldaps.show(auth['name'])['id']
log.log(log.LOG_WARN, "LDAP source {0} allready exists".format(auth['name']))
continue
except TypeError:
pass
ldap_auth_obj = self.dict_underscore(auth)
try:
self.fm.auth_source_ldaps.create( auth_source_ldap=ldap_auth_obj )
except:
log.log(log.LOG_ERROR, "Something went wrong creating LDAP source {0}".format(auth['name']))
def get(self):
parser = reqparse.RequestParser()
return_data = []
parser.add_argument('subvoat_name')
args = parser.parse_args()
schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})
try:
schema({'subvoat_name':args.get('subvoat_name')})
except MultipleInvalid as e:
return {'error':'%s %s' % (e.msg, e.path)}
posts = self.subvoat_utils.get_posts(args['subvoat_name'])
for p in posts:
return_data.append(p)
return {'result':return_data}
def test_entity_ids():
"""Test entity ID validation."""
schema = vol.Schema(cv.entity_ids)
for value in (
'invalid_entity',
'sensor.light,sensor_invalid',
['invalid_entity'],
['sensor.light', 'sensor_invalid'],
['sensor.light,sensor_invalid'],
):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in (
[],
['sensor.light'],
'sensor.light'
):
schema(value)
assert schema('sensor.LIGHT, light.kitchen ') == [
'sensor.light', 'light.kitchen'
]
def test_event_schema():
"""Test event_schema validation."""
for value in (
{}, None,
{
'event_data': {},
},
{
'event': 'state_changed',
'event_data': 1,
},
):
with pytest.raises(vol.MultipleInvalid):
cv.EVENT_SCHEMA(value)
for value in (
{'event': 'state_changed'},
{'event': 'state_changed', 'event_data': {'hello': 'world'}},
):
cv.EVENT_SCHEMA(value)
def test_core_config_schema(self):
"""Test core config schema."""
for value in (
{CONF_UNIT_SYSTEM: 'K'},
{'time_zone': 'non-exist'},
{'latitude': '91'},
{'longitude': -181},
{'customize': 'bla'},
{'customize': {'invalid_entity_id': {}}},
{'customize': {'light.sensor': 100}},
):
with pytest.raises(MultipleInvalid):
config_util.CORE_CONFIG_SCHEMA(value)
config_util.CORE_CONFIG_SCHEMA({
'name': 'Test name',
'latitude': '-23.45',
'longitude': '123.45',
CONF_UNIT_SYSTEM: CONF_UNIT_SYSTEM_METRIC,
'customize': {
'sensor.temperature': {
'hidden': True,
},
},
})
def get_bundle(bundle_path):
"""
Retrieve a bundle spec from a given bundle directory.
Args:
bundle_path: path to the root of the bundle directory.
Returns:
A bundle object.
"""
json_path = join(bundle_path, "bundle.json")
bundle = json.loads(open(json_path, "r").read())
try:
bundle_schema(bundle)
except MultipleInvalid as e:
logger.critical("Error validating bundle object at '%s'!", json_path)
logger.critical(e)
raise FatalException
return bundle
def verify_config(config_object):
"""
Verifies the given configuration dict against the config_schema and the port_range_schema
Raise FatalException if failed.
Args:
config_object: The configuration options in a dict
"""
try:
config_schema(config_object)
except MultipleInvalid as e:
logger.critical("Error validating config file at '%s'!", path)
logger.critical(e)
raise FatalException
for port_range in config_object["banned_ports"]:
try:
port_range_schema(port_range)
assert port_range["start"] <= port_range["end"]
except MultipleInvalid as e:
logger.critical("Error validating port range in config file at '%s'!", path)
logger.critical(e)
raise FatalException
except AssertionError as e:
logger.critical("Invalid port range: (%d -> %d)", port_range["start"], port_range["end"])
raise FatalException
def verify_config(config_object):
"""
Verifies the given configuration dict against the config_schema and the port_range_schema
Raise FatalException if failed.
Args:
config_object: The configuration options in a dict
"""
try:
config_schema(config_object)
except MultipleInvalid as e:
logger.critical("Error validating config file at '%s'!", path)
logger.critical(e)
raise FatalException
for port_range in config_object["banned_ports"]:
try:
port_range_schema(port_range)
assert port_range["start"] <= port_range["end"]
except MultipleInvalid as e:
logger.critical("Error validating port range in config file at '%s'!", path)
logger.critical(e)
raise FatalException
except AssertionError as e:
logger.critical("Invalid port range: (%d -> %d)", port_range["start"], port_range["end"])
raise FatalException
def __init__(self, **kargs):
super(EntityBase, self).__init__(kargs)
self.validate()
try:
raise getattr(self, 'error')
except AttributeError:
pass
except Invalid as e:
raise MultipleInvalid([e])
def test_invalid_products(self):
self.assertRaises(MultipleInvalid, Produto, codigo='abc')
def test_invalid_coletas(self):
self.assertRaises(MultipleInvalid, ColetaSimultanea, codigo='abc')