def _deserialize(self, value, attr, data):
"""Deserializes a string into an Arrow object."""
if not self.context.get('convert_dates', True) or not value:
return value
value = super(ArrowField, self)._deserialize(value, attr, data)
timezone = self.get_field_value('timezone')
target = arrow.get(value)
if timezone and text_type(target.to(timezone)) != text_type(target):
raise ValidationError(
"The provided datetime is not in the "
"{} timezone.".format(timezone)
)
return target
python类ValidationError()的实例源码
def invalid_fields(self, data, original_data):
"""Validator that checks if any keys provided aren't in the schema.
Say your schema has support for keys ``a`` and ``b`` and the data
provided has keys ``a``, ``b``, and ``c``. When the data is loaded into
the schema, a :class:`marshmallow.ValidationError` will be raised
informing the developer that excess keys have been provided.
Raises:
marshmallow.ValidationError: Raised if extra keys exist in the
passed in data.
"""
errors = []
for field in original_data:
# Skip nested fields because they will loop infinitely
if isinstance(field, (set, list, tuple, dict)):
continue
if field not in self.fields.keys():
errors.append(field)
if errors:
raise ValidationError("Invalid field", field_names=errors)
def _field2method(field_or_factory, method_name, preprocess=None):
def method(self, name, default=ma.missing, subcast=None, **kwargs):
name = self._prefix + name if self._prefix else name
missing = kwargs.pop('missing', None) or default
if isinstance(field_or_factory, type) and issubclass(field_or_factory, ma.fields.Field):
field = field_or_factory(missing=missing, **kwargs)
else:
field = field_or_factory(subcast=subcast, missing=missing, **kwargs)
self._fields[name] = field
parsed_key, raw_value = _get_from_environ(name, ma.missing)
if raw_value is ma.missing and field.missing is ma.missing:
raise EnvError('Environment variable "{}" not set'.format(parsed_key))
value = raw_value or field.missing
if preprocess:
value = preprocess(value, subcast=subcast, **kwargs)
try:
value = field.deserialize(value)
except ma.ValidationError as err:
raise EnvError('Environment variable "{}" invalid: {}'.format(name, err.args[0]))
else:
self._values[name] = value
return value
method.__name__ = str(method_name) # cast to str for Py2 compat
return method
def filter(self, collection, data, **kwargs):
"""Parse data and apply filter."""
try:
return self.apply(collection, self.parse(data), **kwargs)
except ValidationError:
return collection
def _deserialize(self, value, attr, data):
try:
return bson.ObjectId(value)
except:
raise ma.ValidationError('invalid ObjectId `%s`' % value)
def test_serialization_with_invalid_data(self, model):
field = fields.MACAddressField()
model.mac = 'random string'
with pytest.raises(marshmallow.ValidationError) as e:
field.serialize('mac', model)
assert str(e.value) == "\"random string\" cannot be formatted as MAC."
def test_deserialization_with_invalid_data(self):
field = fields.MACAddressField()
value = 'random string'
with pytest.raises(marshmallow.ValidationError) as e:
field.deserialize(value)
assert str(e.value) == "\"random string\" cannot be formatted as MAC."
def test_deserialization_with_empty_data(self):
field = fields.MACAddressField()
value = None
with pytest.raises(marshmallow.ValidationError) as e:
field.deserialize(value)
assert str(e.value) == "Field may not be null."
def encode(model: structures.Model):
try:
serialization_result = model.Meta.schema().dumps(model)
except AttributeError as e:
raise exceptions.EncodingError(e)
except marshmallow.ValidationError as e:
raise exceptions.EncodingError(e.messages)
else:
if serialization_result.errors:
raise exceptions.EncodingError(serialization_result.errors)
else:
return serialization_result.data
def decode(type, encoded_data: str):
try:
decoded_data = type.Meta.schema().loads(encoded_data)
except (json.decoder.JSONDecodeError, TypeError, AttributeError):
raise exceptions.DecodingError('Error while decoding.', encoded_data=encoded_data)
except marshmallow.ValidationError as e:
raise exceptions.DecodingError(e.messages)
else:
if decoded_data.errors:
raise exceptions.DecodingError(decoded_data.errors)
else:
return type(**decoded_data.data)
def test_uri_dump_malformed():
schema = URISchema(strict=True)
assert_that(
calling(schema.dump).with_args(
dict(
foo="example",
),
),
raises(ValidationError),
)
def test_uri_load_malformed():
schema = URISchema(strict=True)
assert_that(
calling(schema.load).with_args(
dict(
foo="example",
),
),
raises(ValidationError),
)
def test_dump_invalid():
schema = LanguageSchema(strict=True)
assert_that(
calling(schema.dump).with_args(dict(
language="english",
)),
raises(ValidationError),
)
def test_load_invalid():
schema = LanguageSchema(strict=True)
assert_that(
calling(schema.load).with_args(dict(
language="english",
)),
raises(ValidationError),
)
def handle_exception(error):
if isinstance(error, ValidationError) or\
isinstance(error, IncorrectTypeError):
return make_marshmallow_error_output(error, 400)
elif isinstance(error, MongoValidationError):
return make_exception_error_output(error, 400)
elif isinstance(error, DoesNotExist):
return make_not_found_error_output(error)
elif isinstance(error, NotUniqueError):
return make_exception_error_output(error, 400)
else:
return make_exception_error_output(error, 500)
def must_not_be_blank(data):
if not data:
raise ValidationError('Data not provided.')
def _postprocess(self, data):
# vault.envs format should be:
# ENV_NAME secret/path:key
env_map = ObjectDict()
for env in data['env']:
try:
name, path_key = env.strip().split(' ', 1)
path, key = path_key.strip().split(':', 1)
except ValueError:
raise ValidationError('Invalid vault env {}'.format(name), 'env')
env_map[name] = (path, key)
data['env'] = env_map
return super()._postprocess(data)
def parse(cls, conf):
schema = SpecificationSchema()
try:
parsed = schema.load(conf)
except ValidationError as e:
logger.exception('badwolf specification validation error')
raise InvalidSpecification(str(e))
data = parsed.data
spec = cls()
for key, value in data.items():
setattr(spec, key, value)
return spec
def parse_secretfile(self, path):
if hasattr(path, 'read') and callable(path.read):
envs = parse_secretfile(path)
else:
with open(path) as fd:
envs = parse_secretfile(fd)
env_map = {}
for env in envs:
try:
name, path_key = env.strip().split(' ', 1)
path, key = path_key.strip().split(':', 1)
except ValueError:
raise ValidationError('Invalid Secretfile env {}'.format(name))
env_map[name] = (path, key)
self.vault.env.update(env_map)
def validate_name(self, value):
if not 0 < len(value) < 30:
raise ValidationError('name?????')