def test_raise_invalid_if_provisioner_schema_is_not_satisfied(self, mock_Provisioner):
mock_Provisioner.provisioners = {
'mp1': MockProvisioner1,
'mp2': MockProvisioner2,
'mp3': MockProvisioner3}
schema = get_schema()
with pytest.raises(Invalid) as e:
schema({
'name': 'dummy-test',
'provisioning': [{
'type': 'mp1',
'a': 'dummy',
'b': '16'
}, {
'type': 'mp2',
'a': 'dummydummy', # Exceeds Length(min=5, max=5)
}, {
'type': 'mp3',
'b': 'yes'
}]
})
assert "['provisioning'][1]['a']" in str(e)
python类Length()的实例源码
def validate(self):
supplier = Schema({
Required('ruc'): All(str, Length(min=11, max=11)),
Required('registration_name'): str,
Optional('address'): dict,
Optional('commercial_name'): str
}, extra=ALLOW_EXTRA)
customer = Schema({
Required('ruc'): All(str, Length(min=1, max=11))
}, extra=ALLOW_EXTRA)
schema = Schema({
Required('issue_date'): str,
Required('supplier'): supplier,
Required('customer'): customer,
Required('voucher_type'): str,
Required('currency'): str,
Required('voucher_number'): str,
Required('lines'): All(list, Length(min=1))
}, extra=ALLOW_EXTRA)
schema(self._data)
def get(self):
parser = reqparse.RequestParser()
return_data = []
parser.add_argument('subvoat_name')
args = parser.parse_args()
schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})
try:
schema({'subvoat_name':args.get('subvoat_name')})
except MultipleInvalid as e:
return {'error':'%s %s' % (e.msg, e.path)}
posts = self.subvoat_utils.get_posts(args['subvoat_name'])
for p in posts:
return_data.append(p)
return {'result':return_data}
def __init__(self, server):
self.server = server
self.guild_man = server.guild_man
self.channel_edit_base = Schema({
'name': All(str, Length(min=2, max=100)),
'position': int,
Optional('nsfw'): bool,
}, required=True)
self.textchan_editschema = self.channel_edit_base.extend({
'topic': All(str, Length(min=0, max=1024))
})
self.voicechan_editschema = self.channel_edit_base.extend({
'bitrate': All(int, Range(min=8000, max=96000)),
'user_limit': All(int, Range(min=0, max=99)),
})
self.register()
def schema_ext(self):
return voluptuous.All(six.text_type,
voluptuous.Length(
min=self.min_length,
max=self.max_length))
def MetricSchema(v):
"""metric keyword schema
It could be:
["metric", "metric-ref", "aggregation"]
or
["metric, ["metric-ref", "aggregation"], ["metric-ref", "aggregation"]]
"""
if not isinstance(v, (list, tuple)):
raise voluptuous.Invalid("Expected a tuple/list, got a %s" % type(v))
elif not v:
raise voluptuous.Invalid("Operation must not be empty")
elif len(v) < 2:
raise voluptuous.Invalid("Operation need at least one argument")
elif v[0] != u"metric":
# NOTE(sileht): this error message doesn't looks related to "metric",
# but because that the last schema validated by voluptuous, we have
# good chance (voluptuous.Any is not predictable) to print this
# message even if it's an other operation that invalid.
raise voluptuous.Invalid("'%s' operation invalid" % v[0])
return [u"metric"] + voluptuous.Schema(voluptuous.Any(
voluptuous.ExactSequence([six.text_type, six.text_type]),
voluptuous.All(
voluptuous.Length(min=1),
[voluptuous.ExactSequence([six.text_type, six.text_type])],
)), required=True)(v[1:])
def patch(self):
ap = pecan.request.indexer.get_archive_policy(self.archive_policy)
if not ap:
abort(404, six.text_type(
indexer.NoSuchArchivePolicy(self.archive_policy)))
enforce("update archive policy", ap)
body = deserialize_and_validate(voluptuous.Schema({
voluptuous.Required("definition"):
voluptuous.All([{
"granularity": Timespan,
"points": PositiveNotNullInt,
"timespan": Timespan}], voluptuous.Length(min=1)),
}))
# Validate the data
try:
ap_items = [archive_policy.ArchivePolicyItem(**item) for item in
body['definition']]
except ValueError as e:
abort(400, six.text_type(e))
try:
return pecan.request.indexer.update_archive_policy(
self.archive_policy, ap_items)
except indexer.UnsupportedArchivePolicyChange as e:
abort(400, six.text_type(e))
def post(self):
enforce("create archive policy", {})
# NOTE(jd): Initialize this one at run-time because we rely on conf
conf = pecan.request.conf
valid_agg_methods = (
archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS_VALUES
)
ArchivePolicySchema = voluptuous.Schema({
voluptuous.Required("name"): six.text_type,
voluptuous.Required("back_window", default=0): PositiveOrNullInt,
voluptuous.Required(
"aggregation_methods",
default=set(conf.archive_policy.default_aggregation_methods)):
voluptuous.All(list(valid_agg_methods), voluptuous.Coerce(set)),
voluptuous.Required("definition"):
voluptuous.All([{
"granularity": Timespan,
"points": PositiveNotNullInt,
"timespan": Timespan,
}], voluptuous.Length(min=1)),
})
body = deserialize_and_validate(ArchivePolicySchema)
# Validate the data
try:
ap = archive_policy.ArchivePolicy.from_dict(body)
except ValueError as e:
abort(400, six.text_type(e))
enforce("create archive policy", ap)
try:
ap = pecan.request.indexer.create_archive_policy(ap)
except indexer.ArchivePolicyAlreadyExists as e:
abort(409, six.text_type(e))
location = "/archive_policy/" + ap.name
set_resp_location_hdr(location)
pecan.response.status = 201
return ap
def subvoat_name(self, subvoat_name):
schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})
return self.try_schema('subvoat_name', subvoat_name, schema)
def uuid(self, uuid):
schema = Schema({ Required('uuid'): All(str, Length(min=36, max=36))})
return self.try_schema('uuid', uuid, schema)
def comment_body(self, comment_body):
schema = Schema({ Required('comment_body'): All(str, Length(min=self.config['min_length_comment_body'],
max=self.config['max_length_comment_body']))})
return self.try_schema('comment_body', comment_body, schema)
def thread(self, subvoat_name, title, body):
title_schema = Schema({ Required('title'): All(str, Length(min=self.config['min_length_thread_title'],
max=self.config['max_length_thread_title']))})
body_schema = Schema({ Required('body'): All(str, Length(min=self.config['min_length_thread_body'],
max=self.config['max_length_thread_body']))})
# validate the subvoat_name first
sn_status, sn_result = self.subvoat_name(subvoat_name)
if not sn_status:
return [sn_status, sn_result]
# Then validate the thread title
t_status, t_result = self.try_schema('title', title, title_schema)
if not t_status:
return [t_status, t_result]
# and thread body
b_status, b_result = self.try_schema('body', body, body_schema)
if not b_status:
return [b_status, b_result]
# return True if everything is ok
return [True, None]
def password(self, password):
schema = Schema({ Required('password'): All(str, Length(min=self.config['min_length_password'], max=self.config['max_length_password']))})
return self.try_schema('password', password, schema)
def api_token(self, api_token):
# FIX: make this actually check the token type
schema = Schema({ Required('api_token'): All(str, Length(min=36, max=36))})
return self.try_schema('api_token', api_token, schema)
def __init__(self, server):
self.server = server
self.guild_man = server.guild_man
_o = Optional
self.guild_edit_schema = Schema({
_o('name'): str,
_o('region'): str,
_o('verification_level'): int,
_o('default_message_notifications'): int,
_o('afk_channel_id'): str,
_o('afk_timeout'): int,
_o('icon'): str,
_o('owner_id'): str,
}, required=True, extra=REMOVE_EXTRA)
self.guild_create_schema = Schema({
'name': str,
'region': str,
'icon': Any(None, str),
'verification_level': int,
'default_message_notifications': int,
}, extra=REMOVE_EXTRA)
self.channel_create_schema = Schema({
'name': All(str, Length(min=2, max=100)),
_o('type'): int,
_o('bitrate'): int,
_o('user_limit'): int,
_o('permission_overwrites'): list,
}, required=True, extra=REMOVE_EXTRA)
self.register()
def valid_subscribe_topic(value, invalid_chars='\0'):
"""Validate that we can subscribe using this MQTT topic."""
if isinstance(value, str) and all(c not in value for c in invalid_chars):
return vol.Length(min=1, max=65535)(value)
raise vol.Invalid('Invalid MQTT topic name')
def MetricSchema(definition):
creator = pecan.request.auth_helper.get_current_user(
pecan.request)
# First basic validation
schema = voluptuous.Schema({
"archive_policy_name": six.text_type,
"resource_id": functools.partial(ResourceID, creator=creator),
"name": six.text_type,
voluptuous.Optional("unit"):
voluptuous.All(six.text_type, voluptuous.Length(max=31)),
})
definition = schema(definition)
archive_policy_name = definition.get('archive_policy_name')
name = definition.get('name')
if name and '/' in name:
abort(400, "'/' is not supported in metric name")
if archive_policy_name is None:
try:
ap = pecan.request.indexer.get_archive_policy_for_metric(name)
except indexer.NoArchivePolicyRuleMatch:
# NOTE(jd) Since this is a schema-like function, we
# should/could raise ValueError, but if we do so, voluptuous
# just returns a "invalid value" with no useful message – so we
# prefer to use abort() to make sure the user has the right
# error message
abort(400, "No archive policy name specified "
"and no archive policy rule found matching "
"the metric name %s" % name)
else:
definition['archive_policy_name'] = ap.name
resource_id = definition.get('resource_id')
if resource_id is None:
original_resource_id = None
else:
if name is None:
abort(400,
{"cause": "Attribute value error",
"detail": "name",
"reason": "Name cannot be null "
"if resource_id is not null"})
original_resource_id, resource_id = resource_id
enforce("create metric", {
"creator": creator,
"archive_policy_name": archive_policy_name,
"resource_id": resource_id,
"original_resource_id": original_resource_id,
"name": name,
"unit": definition.get('unit'),
})
return definition