def test_isfile():
"""Validate that the value is an existing file."""
schema = vol.Schema(cv.isfile)
fake_file = 'this-file-does-not.exist'
assert not os.path.isfile(fake_file)
for value in ('invalid', None, -1, 0, 80000, fake_file):
with pytest.raises(vol.Invalid):
schema(value)
# patching methods that allow us to fake a file existing
# with write access
with patch('os.path.isfile', Mock(return_value=True)), \
patch('os.access', Mock(return_value=True)):
schema('test.txt')
python类Schema()的实例源码
def test_entity_ids():
"""Test entity ID validation."""
schema = vol.Schema(cv.entity_ids)
for value in (
'invalid_entity',
'sensor.light,sensor_invalid',
['invalid_entity'],
['sensor.light', 'sensor_invalid'],
['sensor.light,sensor_invalid'],
):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in (
[],
['sensor.light'],
'sensor.light'
):
schema(value)
assert schema('sensor.LIGHT, light.kitchen ') == [
'sensor.light', 'light.kitchen'
]
def test_time_period():
"""Test time_period validation."""
schema = vol.Schema(cv.time_period)
for value in (
None, '', 'hello:world', '12:', '12:34:56:78',
{}, {'wrong_key': -10}
):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in (
'8:20', '23:59', '-8:20', '-23:59:59', '-48:00', {'minutes': 5}, 1, '5'
):
schema(value)
assert timedelta(seconds=180) == schema('180')
assert timedelta(hours=23, minutes=59) == schema('23:59')
assert -1 * timedelta(hours=1, minutes=15) == schema('-1:15')
def test_socket_timeout():
"""Test socket timeout validator."""
TEST_CONF_TIMEOUT = 'timeout'
schema = vol.Schema(
{vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout})
with pytest.raises(vol.Invalid):
schema({TEST_CONF_TIMEOUT: 0.0})
with pytest.raises(vol.Invalid):
schema({TEST_CONF_TIMEOUT: -1})
assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT:
None})[TEST_CONF_TIMEOUT]
assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
def _build_attributes_validator(mcs):
"""
Returns validator to validate the sub-classes attributes.
"""
valid_attributes = {
Required("commands", 'required class attribute'): [
{
Required("name"): str,
Required("cmd"): [str],
Optional("kill-signal", default=signal.SIGINT): int
}
]
}
for attr_name, class_client in mcs._class_clients.items():
client_validator = {
Required("name"): str,
}
client_validator.update(class_client.validator())
key = Optional(attr_name, 'required class attribute')
valid_attributes[key] = [client_validator]
return Schema(valid_attributes, extra=ALLOW_EXTRA)
def valid_adapter_response(base_name, func_name, data):
"""
Valid that given data match described schema
:param str base_name: The name of the asbtract
:param str func_name: The name of the called function
:parma raw data: The data to valid
:raises InvalidFormatError: if data is not compliant
"""
if not data:
return
try:
Schemas[base_name][func_name](data)
except (KeyError, TypeError, ValueError):
raise SchemaNotFound('Schema not found for %s.%s' % (base_name, func_name))
except (Invalid, MultipleInvalid) as ex:
raise InvalidFormatError('Given data is not compliant to %s.%s schema: %s' % (base_name, func_name, str(ex)))
def validate_body(schema_desc):
"""
Validate json body
"""
def real_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
body = request.get_json()
if not Schemas.get(func.__name__):
Schemas[func.__name__] = Schema(schema_desc, required=True)
Logger.debug(unicode('registering schema for %s' % (func.__name__)))
Schemas[func.__name__](body)
except (Invalid, MultipleInvalid) as ex:
Logger.error(unicode(ex))
msg = 'Missing or invalid field(s) in body, expecting {}'.format(schema_desc)
raise BadRequest(msg)
return func(*args, **kwargs)
return wrapper
return real_decorator
def getSchema(self, data):
schema = v.Schema({v.Required('servers'): self.servers,
'palettes': self.palettes,
'palette': str,
'keymaps': self.keymaps,
'keymap': str,
'commentlinks': self.commentlinks,
'dashboards': self.dashboards,
'reviewkeys': self.reviewkeys,
'story-list-query': str,
'diff-view': str,
'hide-comments': self.hide_comments,
'display-times-in-utc': bool,
'handle-mouse': bool,
'breadcrumbs': bool,
'story-list-options': self.story_list_options,
'expire-age': str,
})
return schema
def __init__(self):
self.redisinstance = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0)
self.validateschema = Schema({
Required(RegistryKeys.SERVICE_ID): Any(str, unicode),
Required(RegistryKeys.SERVICE_NAME): Any(str, unicode),
Required(RegistryKeys.SERVICE_PORT): int,
Required(RegistryKeys.NAMESPACE, default='default'): Any(str, unicode),
Required(RegistryKeys.ENDPOINTS): [{
Required(RegistryKeys.URI): Any(str, unicode),
Required(RegistryKeys.ACCEPTANCE_REGEX): Any(str, unicode),
RegistryKeys.FILTER_REGEX: {
Required(RegistryKeys.PATTERN): Any(str, unicode),
Required(RegistryKeys.REPLACE): Any(str, unicode)
}
}]
})
def validate(self):
supplier = Schema({
Required('ruc'): All(str, Length(min=11, max=11)),
Required('registration_name'): str,
Optional('address'): dict,
Optional('commercial_name'): str
}, extra=ALLOW_EXTRA)
customer = Schema({
Required('ruc'): All(str, Length(min=1, max=11))
}, extra=ALLOW_EXTRA)
schema = Schema({
Required('issue_date'): str,
Required('supplier'): supplier,
Required('customer'): customer,
Required('voucher_type'): str,
Required('currency'): str,
Required('voucher_number'): str,
Required('lines'): All(list, Length(min=1))
}, extra=ALLOW_EXTRA)
schema(self._data)
def __init__(self, type, name, required, options=None):
if (len(name) > 63 or name in INVALID_NAMES
or not VALID_CHARS.match(name)):
raise InvalidResourceAttributeName(name)
self.name = name
self.required = required
self.fill = None
# options is set only when we update a resource type
if options is not None:
fill = options.get("fill")
if fill is None and required:
raise InvalidResourceAttributeOption(
name, "fill", "must not be empty if required=True")
elif fill is not None:
# Ensure fill have the correct attribute type
try:
self.fill = voluptuous.Schema(self.schema_ext)(fill)
except voluptuous.Error as e:
raise InvalidResourceAttributeOption(name, "fill", e)
def __init__(self, *args, **kwargs):
super(ResourceTypeSchemaManager, self).__init__(*args, **kwargs)
type_schemas = tuple([ext.plugin.meta_schema()
for ext in self.extensions])
self._schema = voluptuous.Schema({
"name": six.text_type,
voluptuous.Required("attributes", default={}): {
six.text_type: voluptuous.Any(*tuple(type_schemas))
}
})
type_schemas = tuple([ext.plugin.meta_schema(for_update=True)
for ext in self.extensions])
self._schema_for_update = voluptuous.Schema({
"name": six.text_type,
voluptuous.Required("attributes", default={}): {
six.text_type: voluptuous.Any(*tuple(type_schemas))
}
})
def post(self):
enforce("create archive policy rule", {})
ArchivePolicyRuleSchema = voluptuous.Schema({
voluptuous.Required("name"): six.text_type,
voluptuous.Required("metric_pattern"): six.text_type,
voluptuous.Required("archive_policy_name"): six.text_type,
})
body = deserialize_and_validate(ArchivePolicyRuleSchema)
enforce("create archive policy rule", body)
try:
ap = pecan.request.indexer.create_archive_policy_rule(
body['name'], body['metric_pattern'],
body['archive_policy_name']
)
except indexer.ArchivePolicyRuleAlreadyExists as e:
abort(409, six.text_type(e))
location = "/archive_policy_rule/" + ap.name
set_resp_location_hdr(location)
pecan.response.status = 201
return ap
def load_tcconfig(self, config_file_path):
import json
from voluptuous import Schema, Required, Any, ALLOW_EXTRA
schema = Schema({
Required(six.text_type): {
Any(*TrafficDirection.LIST): {
six.text_type: {
six.text_type: Any(six.text_type, int, float)
},
}
},
}, extra=ALLOW_EXTRA)
with open(config_file_path) as fp:
self.__config_table = json.load(fp)
schema(self.__config_table)
self.__logger.debug("tc config file: {:s}".format(
json.dumps(self.__config_table, indent=4)))
def get(self):
parser = reqparse.RequestParser()
return_data = []
parser.add_argument('subvoat_name')
args = parser.parse_args()
schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})
try:
schema({'subvoat_name':args.get('subvoat_name')})
except MultipleInvalid as e:
return {'error':'%s %s' % (e.msg, e.path)}
posts = self.subvoat_utils.get_posts(args['subvoat_name'])
for p in posts:
return_data.append(p)
return {'result':return_data}
def page(self, start, end):
s_schema = Schema({ Required('start'): All(int, Range(min=0))})
e_schema = Schema({ Required('end'): All(int, Range(min=0))})
s_status, s_result = self.try_schema('start', start, s_schema)
if not s_status:
return [s_status, s_result]
e_status, e_result = self.try_schema('end', end, e_schema)
if not e_status:
return [e_status, e_result]
if end < start:
return [False, 'ending page cannot be lower than starting page']
total_pages = end - start
if total_pages > 50:
return [False, 'you cannot request more than 50 pages']
return [True, None]
def __init__(self, server):
self.server = server
self.guild_man = server.guild_man
self.channel_edit_base = Schema({
'name': All(str, Length(min=2, max=100)),
'position': int,
Optional('nsfw'): bool,
}, required=True)
self.textchan_editschema = self.channel_edit_base.extend({
'topic': All(str, Length(min=0, max=1024))
})
self.voicechan_editschema = self.channel_edit_base.extend({
'bitrate': All(int, Range(min=8000, max=96000)),
'user_limit': All(int, Range(min=0, max=99)),
})
self.register()
def test_isfile():
"""Validate that the value is an existing file."""
schema = vol.Schema(cv.isfile)
fake_file = 'this-file-does-not.exist'
assert not os.path.isfile(fake_file)
for value in ('invalid', None, -1, 0, 80000, fake_file):
with pytest.raises(vol.Invalid):
schema(value)
# patching methods that allow us to fake a file existing
# with write access
with patch('os.path.isfile', Mock(return_value=True)), \
patch('os.access', Mock(return_value=True)):
schema('test.txt')
def test_entity_ids():
"""Test entity ID validation."""
schema = vol.Schema(cv.entity_ids)
for value in (
'invalid_entity',
'sensor.light,sensor_invalid',
['invalid_entity'],
['sensor.light', 'sensor_invalid'],
['sensor.light,sensor_invalid'],
):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in (
[],
['sensor.light'],
'sensor.light'
):
schema(value)
assert schema('sensor.LIGHT, light.kitchen ') == [
'sensor.light', 'light.kitchen'
]
def test_time_period():
"""Test time_period validation."""
schema = vol.Schema(cv.time_period)
for value in (
None, '', 'hello:world', '12:', '12:34:56:78',
{}, {'wrong_key': -10}
):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in (
'8:20', '23:59', '-8:20', '-23:59:59', '-48:00', {'minutes': 5}, 1, '5'
):
schema(value)
assert timedelta(seconds=180) == schema('180')
assert timedelta(hours=23, minutes=59) == schema('23:59')
assert -1 * timedelta(hours=1, minutes=15) == schema('-1:15')
def test_socket_timeout():
"""Test socket timeout validator."""
TEST_CONF_TIMEOUT = 'timeout'
schema = vol.Schema(
{vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout})
with pytest.raises(vol.Invalid):
schema({TEST_CONF_TIMEOUT: 0.0})
with pytest.raises(vol.Invalid):
schema({TEST_CONF_TIMEOUT: -1})
assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT:
None})[TEST_CONF_TIMEOUT]
assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
def test_boolean():
"""Test boolean validation."""
schema = vol.Schema(cv.boolean)
for value in ('T', 'negative', 'lock'):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in ('true', 'On', '1', 'YES', 'enable', 1, True):
assert schema(value)
for value in ('false', 'Off', '0', 'NO', 'disable', 0, False):
assert not schema(value)
def test_latitude():
"""Test latitude validation."""
schema = vol.Schema(cv.latitude)
for value in ('invalid', None, -91, 91, '-91', '91', '123.01A'):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in ('-89', 89, '12.34'):
schema(value)
def test_longitude():
"""Test longitude validation."""
schema = vol.Schema(cv.longitude)
for value in ('invalid', None, -181, 181, '-181', '181', '123.01A'):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in ('-179', 179, '12.34'):
schema(value)
def test_port():
"""Test TCP/UDP network port."""
schema = vol.Schema(cv.port)
for value in ('invalid', None, -1, 0, 80000, '81000'):
with pytest.raises(vol.MultipleInvalid):
schema(value)
for value in ('1000', 21, 24574):
schema(value)
def test_entity_id():
"""Test entity ID validation."""
schema = vol.Schema(cv.entity_id)
with pytest.raises(vol.MultipleInvalid):
schema('invalid_entity')
assert 'sensor.light' == schema('sensor.LIGHT')
def test_icon():
"""Test icon validation."""
schema = vol.Schema(cv.icon)
for value in (False, 'work', 'icon:work'):
with pytest.raises(vol.MultipleInvalid):
schema(value)
schema('mdi:work')
def test_service():
"""Test service validation."""
schema = vol.Schema(cv.service)
with pytest.raises(vol.MultipleInvalid):
schema('invalid_turn_on')
schema('scarlett_os.turn_on')
def test_string():
"""Test string validation."""
schema = vol.Schema(cv.string)
with pytest.raises(vol.MultipleInvalid):
schema(None)
for value in (True, 1, 'hello'):
schema(value)
def test_temperature_unit():
"""Test temperature unit validation."""
schema = vol.Schema(cv.temperature_unit)
with pytest.raises(vol.MultipleInvalid):
schema('K')
schema('C')
schema('F')