def load_config(path='.boss.yml'):
loader = j.FileSystemLoader('.')
try:
template = loader.load(j.Environment(), path, os.environ)
yml = template.render()
doc = yaml.load(yml)
return transform_config(doc)
except j.TemplateNotFound:
error = 'Error loading {}: not found'.format(path)
raise ConfigurationError(error)
except j.TemplateSyntaxError as e:
error = 'Error loading {}: {}, line {}'.format(path, e, e.lineno)
raise ConfigurationError(error)
except IOError as e:
error = 'Error loading {}: {}'.format(path, e.strerror)
raise ConfigurationError(error)
except v.Invalid as e:
error = 'Error validating {}: {}'.format(path, e)
raise ConfigurationError(error)
python类Invalid()的实例源码
def test_isfile():
"""Validate that the value is an existing file."""
schema = vol.Schema(cv.isfile)
fake_file = 'this-file-does-not.exist'
assert not os.path.isfile(fake_file)
for value in ('invalid', None, -1, 0, 80000, fake_file):
with pytest.raises(vol.Invalid):
schema(value)
# patching methods that allow us to fake a file existing
# with write access
with patch('os.path.isfile', Mock(return_value=True)), \
patch('os.access', Mock(return_value=True)):
schema('test.txt')
def match_all(value):
"""Validator that matches all values."""
return value
# def platform_validator(domain):
# """Validate if platform exists for given domain."""
# def validator(value):
# """Test if platform exists."""
# if value is None:
# raise vol.Invalid('platform cannot be None')
# if get_platform(domain, str(value)):
# return value
# raise vol.Invalid(
# 'platform {} does not exist for {}'.format(value, domain))
# return validator
def __new__(mcs, cls_name, bases, attrs, previous=None, start=False):
if bases and TestServer in bases:
for attr_name in mcs._class_clients:
attrs.setdefault(attr_name, [])
validate_attributes = mcs._build_attributes_validator()
try:
attrs = validate_attributes(attrs)
except Invalid as error:
msg = "{} @ {}.{}".format(error.error_message,
cls_name, error.path[0])
msg += ''.join('[{!r}]'.format(element)
for element in error.path[1:])
raise AttributeError(msg)
return MetaTestState.__new__(mcs, cls_name, bases, attrs, previous,
start)
def valid_adapter_response(base_name, func_name, data):
"""
Valid that given data match described schema
:param str base_name: The name of the asbtract
:param str func_name: The name of the called function
:parma raw data: The data to valid
:raises InvalidFormatError: if data is not compliant
"""
if not data:
return
try:
Schemas[base_name][func_name](data)
except (KeyError, TypeError, ValueError):
raise SchemaNotFound('Schema not found for %s.%s' % (base_name, func_name))
except (Invalid, MultipleInvalid) as ex:
raise InvalidFormatError('Given data is not compliant to %s.%s schema: %s' % (base_name, func_name, str(ex)))
def validate_body(schema_desc):
"""
Validate json body
"""
def real_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
body = request.get_json()
if not Schemas.get(func.__name__):
Schemas[func.__name__] = Schema(schema_desc, required=True)
Logger.debug(unicode('registering schema for %s' % (func.__name__)))
Schemas[func.__name__](body)
except (Invalid, MultipleInvalid) as ex:
Logger.error(unicode(ex))
msg = 'Missing or invalid field(s) in body, expecting {}'.format(schema_desc)
raise BadRequest(msg)
return func(*args, **kwargs)
return wrapper
return real_decorator
def post(self):
try:
body = json.loads(self.request.body)
web_service_model.banana_model(body)
type_table = self._monanas.compute_type_table(body["content"])
self.write(type_table)
except (AttributeError, voluptuous.Invalid, ValueError) as e:
logger.warn("Wrong request: {}.".
format(e))
self.set_status(400, "The request body was malformed.")
except Exception as e:
tb = traceback.format_exc()
print(tb)
logger.error("Unexpected error: {}. {}".
format(sys.exc_info()[0], e))
self.set_status(500, "Internal server error.")
self.flush()
self.finish()
def validate_links(links):
"""Validate links to make sure, nothing is missing
:type links: dict
:param links: connection links to validate
:raises: SchemaError -- if any link is missing
"""
missing = set([])
all_keys = set(links.keys())
for connections in links.values():
for component in connections:
if component not in all_keys:
missing.add(component.id())
if len(missing) > 0:
raise voluptuous.Invalid([
"In connections section, the following components are not "
"connected\n\t{}\n"
"please modify the configuration so that their list of "
"connections is at least '[]'".format(", ".join(missing))], [])
def _validate_existing_id(config, component_id):
"""Check that the id passed as parameter is defined in the configuration
:type config: dict
:param config: configuration model for the whole system
:type component_id: str
:param component_id: component ID to be found in configuration
"""
found_id = False
for comp_type in valid_connection_types.keys():
if component_id in config[comp_type].keys():
found_id = True
if not found_id:
raise voluptuous.Invalid([
'In "connections", component `{}` hasn\'t been defined'
.format(component_id)
], [])
def get_pagination_options(params, default):
try:
opts = voluptuous.Schema({
voluptuous.Required(
"limit", default=pecan.request.conf.api.max_limit):
voluptuous.All(voluptuous.Coerce(int),
voluptuous.Range(min=1),
voluptuous.Clamp(
min=1, max=pecan.request.conf.api.max_limit)),
"marker": six.text_type,
voluptuous.Required("sort", default=default):
voluptuous.All(
voluptuous.Coerce(arg_to_list),
[six.text_type]),
}, extra=voluptuous.REMOVE_EXTRA)(params)
except voluptuous.Invalid as e:
abort(400, {"cause": "Argument value error",
"reason": str(e)})
opts['sorts'] = opts['sort']
del opts['sort']
return opts
def post(self):
schema = pecan.request.indexer.get_resource_type_schema()
body = deserialize_and_validate(schema)
body["state"] = "creating"
try:
rt = schema.resource_type_from_dict(**body)
except resource_type.InvalidResourceAttribute as e:
abort(400, "Invalid input: %s" % e)
enforce("create resource type", body)
try:
rt = pecan.request.indexer.create_resource_type(rt)
except indexer.ResourceTypeAlreadyExists as e:
abort(409, six.text_type(e))
set_resp_location_hdr("/resource_type/" + rt.name)
pecan.response.status = 201
return rt
def test_register_operation_wraps_protectron_unmatching_schema():
@register_operation
@protectron(Schema({'a': int}))
def mirror2(a):
return a
event = {
'operation': 'mirror2',
'args': {
'a': '42'
}
}
with pytest.raises(Invalid):
assert dispatch_event(event) == 42
def Alphanumeric(msg=None):
'''
Checks whether a value is:
- int, or
- long, or
- float without a fractional part, or
- str or unicode composed only of alphanumeric characters
'''
def fn(value):
if not any([
isinstance(value, numbers.Integral),
(isinstance(value, float) and value.is_integer()),
(isinstance(value, basestring) and value.isalnum())
]):
raise Invalid(msg or (
'Invalid input <{0}>; expected an integer'.format(value))
)
else:
return value
return fn
def StrictlyAlphanumeric(msg=None):
'''
Checks whether a value is:
- str or unicode, and
- composed of both alphabets and digits
'''
def fn(value):
if not (
isinstance(value, basestring) and
value.isalnum() and not
value.isdigit() and not
value.isalpha()
):
raise Invalid(msg or (
'Invalid input <{0}>; expected an integer'.format(value))
)
else:
return value
return fn
def __call__(self, value):
'''
Checks whether a value is list of given validation_function.
Returns list of validated values or just one valid value in
list if there is only one element in given CSV string.
'''
try:
if isinstance(value, basestring):
if self.separator in value:
seperated_string_values =[item.strip() for item
in value.split(self.separator)]
values = [self.validation_function(item) for item
in seperated_string_values]
else:
values = [self.validation_function(value)]
return values
else:
raise ValueError
except (Invalid, ValueError) as e:
raise Invalid(self.msg or
('<{0}> is not valid set of <{1}>, {2}'.format(
value,
self.value_type,
e)))
def test_update_active_instance_entity_with_bad_payload(self):
self.entity_ctl.update_active_instance_entity.side_effect = ValueError(
'Expecting object: line 1 column 15 (char 14)'
)
instance_id = 'INSTANCE_ID'
data = {
'flavor': 'A_FLAVOR',
}
code, result = self.api_put('/v1/entity/instance/INSTANCE_ID',
data=data,
headers={'X-Auth-Token': 'some token value'})
self.entity_ctl.update_active_instance_entity.assert_called_once_with(instance_id=instance_id, **data)
self.assertIn("error", result)
self.assertEqual(result["error"], 'Invalid parameter or payload')
self.assertEqual(code, 400)
def test_update_active_instance_entity_with_wrong_attribute_raise_exception(self):
errors = [
Invalid(message="error message1", path=["my_attribute1"]),
Invalid(message="error message2", path=["my_attribute2"]),
]
self.entity_ctl.update_active_instance_entity.side_effect = exception.InvalidAttributeException(errors)
formatted_errors = {
"my_attribute1": "error message1",
"my_attribute2": "error message2",
}
instance_id = 'INSTANCE_ID'
data = {
'flavor': 'A_FLAVOR',
}
code, result = self.api_put('/v1/entity/instance/INSTANCE_ID',
data=data,
headers={'X-Auth-Token': 'some token value'})
self.entity_ctl.update_active_instance_entity.assert_called_once_with(instance_id=instance_id, **data)
self.assertIn("error", result)
self.assertEqual(result['error'], formatted_errors)
self.assertEqual(code, 400)
def valid_format_string(valid_fields):
"""
Ensure that the provided string can be parsed as a python format string, and contains only `valid_fields`
:param valid_fields: set or sequence of valid field names
"""
f = Formatter()
valid_fields = set(valid_fields)
def validate_string(format_string):
fields = set(field_name for _, field_name, _, _ in f.parse(format_string) if field_name)
if fields < valid_fields:
return format_string
else:
raise Invalid('format string specifies invalid field(s): %s' % (fields - valid_fields))
return validate_string
def test_isfile():
"""Validate that the value is an existing file."""
schema = vol.Schema(cv.isfile)
fake_file = 'this-file-does-not.exist'
assert not os.path.isfile(fake_file)
for value in ('invalid', None, -1, 0, 80000, fake_file):
with pytest.raises(vol.Invalid):
schema(value)
# patching methods that allow us to fake a file existing
# with write access
with patch('os.path.isfile', Mock(return_value=True)), \
patch('os.access', Mock(return_value=True)):
schema('test.txt')
def test_socket_timeout():
"""Test socket timeout validator."""
TEST_CONF_TIMEOUT = 'timeout'
schema = vol.Schema(
{vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout})
with pytest.raises(vol.Invalid):
schema({TEST_CONF_TIMEOUT: 0.0})
with pytest.raises(vol.Invalid):
schema({TEST_CONF_TIMEOUT: -1})
assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT:
None})[TEST_CONF_TIMEOUT]
assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
def test_config_error(self):
"""Test for configuration errors."""
with self.assertRaises(vol.Invalid):
unifi.PLATFORM_SCHEMA({
# no username
CONF_PLATFORM: unifi.DOMAIN,
CONF_HOST: 'myhost',
'port': 123,
})
with self.assertRaises(vol.Invalid):
unifi.PLATFORM_SCHEMA({
CONF_PLATFORM: unifi.DOMAIN,
CONF_USERNAME: 'foo',
CONF_PASSWORD: 'password',
CONF_HOST: 'myhost',
'port': 'foo', # bad port!
})
def socket_timeout(value):
"""Validate timeout float > 0.0.
None coerced to socket._GLOBAL_DEFAULT_TIMEOUT bare object.
"""
if value is None:
return _GLOBAL_DEFAULT_TIMEOUT
else:
try:
float_value = float(value)
if float_value > 0.0:
return float_value
raise vol.Invalid('Invalid socket timeout value.'
' float > 0.0 required.')
except Exception as _:
raise vol.Invalid('Invalid socket timeout: {err}'.format(err=_))
# pylint: disable=no-value-for-parameter
def post(self, request):
"""Accept the POST request for push registrations from a browser."""
try:
data = yield from request.json()
except ValueError:
return self.json_message('Invalid JSON', HTTP_BAD_REQUEST)
try:
data = REGISTER_SCHEMA(data)
except vol.Invalid as ex:
return self.json_message(humanize_error(data, ex),
HTTP_BAD_REQUEST)
name = ensure_unique_string('unnamed device',
self.registrations.keys())
self.registrations[name] = data
if not _save_config(self.json_path, self.registrations):
return self.json_message('Error saving registration.',
HTTP_INTERNAL_SERVER_ERROR)
return self.json_message('Push notification subscriber registered.')
def post(self, request):
"""Handle the POST request for device identification."""
try:
req_data = yield from request.json()
except ValueError:
return self.json_message('Invalid JSON', HTTP_BAD_REQUEST)
try:
data = IDENTIFY_SCHEMA(req_data)
except vol.Invalid as ex:
return self.json_message(humanize_error(request.json, ex),
HTTP_BAD_REQUEST)
name = data.get(ATTR_DEVICE_ID)
CONFIG_FILE[ATTR_DEVICES][name] = data
if not _save_config(CONFIG_FILE_PATH, CONFIG_FILE):
return self.json_message("Error saving device.",
HTTP_INTERNAL_SERVER_ERROR)
return self.json({"status": "registered"})
def check(*callback_tuples):
"""
Voluptuous wrapper function to raise our APIException
Args:
callback_tuples: a callback_tuple should contain (status, msg, callbacks)
Returns:
Returns a function callback for the Schema
"""
def v(value):
"""
Trys to validate the value with the given callbacks.
Args:
value: the item to validate
Raises:
APIException with the given error code and msg.
Returns:
The value if the validation callbacks are satisfied.
"""
for msg, callbacks in callback_tuples:
for callback in callbacks:
try:
result = callback(value)
if not result and type(result) == bool:
raise Invalid()
except Exception:
raise WebException(msg)
return value
return v
def check(*callback_tuples):
"""
Voluptuous wrapper function to raise our APIException
Args:
callback_tuples: a callback_tuple should contain (status, msg, callbacks)
Returns:
Returns a function callback for the Schema
"""
def v(value):
"""
Trys to validate the value with the given callbacks.
Args:
value: the item to validate
Raises:
APIException with the given error code and msg.
Returns:
The value if the validation callbacks are satisfied.
"""
for msg, callbacks in callback_tuples:
for callback in callbacks:
try:
result = callback(value)
if not result and type(result) == bool:
raise Invalid()
except Exception:
raise WebException(msg)
return value
return v
def CNPJ(value):
if not validate_cnpj(value):
raise Invalid("Invalid CNPJ")
return clean_id(value)
def CPF(value):
if not validate_cpf(value):
raise Invalid("Invalid CPF")
return clean_id(value)
def CEP(value):
try:
format_cep(value)
except ValueError as e:
raise Invalid(e)
return clean_id(value)
def Date(value):
fmt = '%d/%m/%Y'
try:
if isinstance(value, int):
value = datetime.date.today() + datetime.timedelta(value)
return value.strftime(fmt)
except Exception:
raise Invalid("Should be an instance of datetime.datetime")