def test_socket_timeout():
"""Test socket timeout validator."""
TEST_CONF_TIMEOUT = 'timeout'
schema = vol.Schema(
{vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout})
with pytest.raises(vol.Invalid):
schema({TEST_CONF_TIMEOUT: 0.0})
with pytest.raises(vol.Invalid):
schema({TEST_CONF_TIMEOUT: -1})
assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT:
None})[TEST_CONF_TIMEOUT]
assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
python类Required()的实例源码
def setUp(self):
spy = unittest.mock.Mock()
class MyClient(Client):
attr_name = 'my_clients'
@staticmethod
def validator():
return {
voluptuous.Required('foo'): int,
voluptuous.Optional('bar', default='def bar'): str
}
def __init__(self, **kwargs):
spy(kwargs)
def close(self):
spy('close')
self.spy = spy
MetaServerTestState.bind_class_client(MyClient)
self.addCleanup(lambda: MetaServerTestState.forget_client(MyClient))
def getSchema(self, data):
schema = v.Schema({v.Required('servers'): self.servers,
'palettes': self.palettes,
'palette': str,
'keymaps': self.keymaps,
'keymap': str,
'commentlinks': self.commentlinks,
'dashboards': self.dashboards,
'reviewkeys': self.reviewkeys,
'story-list-query': str,
'diff-view': str,
'hide-comments': self.hide_comments,
'display-times-in-utc': bool,
'handle-mouse': bool,
'breadcrumbs': bool,
'story-list-options': self.story_list_options,
'expire-age': str,
})
return schema
def __init__(self, *args, **kwargs):
super(ResourceTypeSchemaManager, self).__init__(*args, **kwargs)
type_schemas = tuple([ext.plugin.meta_schema()
for ext in self.extensions])
self._schema = voluptuous.Schema({
"name": six.text_type,
voluptuous.Required("attributes", default={}): {
six.text_type: voluptuous.Any(*tuple(type_schemas))
}
})
type_schemas = tuple([ext.plugin.meta_schema(for_update=True)
for ext in self.extensions])
self._schema_for_update = voluptuous.Schema({
"name": six.text_type,
voluptuous.Required("attributes", default={}): {
six.text_type: voluptuous.Any(*tuple(type_schemas))
}
})
def get_pagination_options(params, default):
try:
opts = voluptuous.Schema({
voluptuous.Required(
"limit", default=pecan.request.conf.api.max_limit):
voluptuous.All(voluptuous.Coerce(int),
voluptuous.Range(min=1),
voluptuous.Clamp(
min=1, max=pecan.request.conf.api.max_limit)),
"marker": six.text_type,
voluptuous.Required("sort", default=default):
voluptuous.All(
voluptuous.Coerce(arg_to_list),
[six.text_type]),
}, extra=voluptuous.REMOVE_EXTRA)(params)
except voluptuous.Invalid as e:
abort(400, {"cause": "Argument value error",
"reason": str(e)})
opts['sorts'] = opts['sort']
del opts['sort']
return opts
def get(self):
parser = reqparse.RequestParser()
return_data = []
parser.add_argument('subvoat_name')
args = parser.parse_args()
schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})
try:
schema({'subvoat_name':args.get('subvoat_name')})
except MultipleInvalid as e:
return {'error':'%s %s' % (e.msg, e.path)}
posts = self.subvoat_utils.get_posts(args['subvoat_name'])
for p in posts:
return_data.append(p)
return {'result':return_data}
def page(self, start, end):
s_schema = Schema({ Required('start'): All(int, Range(min=0))})
e_schema = Schema({ Required('end'): All(int, Range(min=0))})
s_status, s_result = self.try_schema('start', start, s_schema)
if not s_status:
return [s_status, s_result]
e_status, e_result = self.try_schema('end', end, e_schema)
if not e_status:
return [e_status, e_result]
if end < start:
return [False, 'ending page cannot be lower than starting page']
total_pages = end - start
if total_pages > 50:
return [False, 'you cannot request more than 50 pages']
return [True, None]
def test_socket_timeout():
"""Test socket timeout validator."""
TEST_CONF_TIMEOUT = 'timeout'
schema = vol.Schema(
{vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout})
with pytest.raises(vol.Invalid):
schema({TEST_CONF_TIMEOUT: 0.0})
with pytest.raises(vol.Invalid):
schema({TEST_CONF_TIMEOUT: -1})
assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT:
None})[TEST_CONF_TIMEOUT]
assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
def _state_variable_create_schema(self, type_info):
# construct validators
validators = []
data_type = type_info['data_type_python']
validators.append(data_type)
if 'allowed_values' in type_info:
allowed_values = type_info['allowed_values']
in_ = vol.In(allowed_values) # coerce allowed values? assume always string for now
validators.append(in_)
if 'allowed_value_range' in type_info:
min_ = type_info['allowed_value_range'].get('min', None)
max_ = type_info['allowed_value_range'].get('max', None)
min_ = data_type(min_)
max_ = data_type(max_)
range_ = vol.Range(min=min_, max=max_)
validators.append(range_)
# construct key
key = vol.Required('value')
if 'default_value' in type_info:
default_value = type_info['default_value']
if data_type == bool:
default_value = default_value == '1'
else:
default_value = data_type(default_value)
key.default = default_value
return vol.Schema({key: vol.All(*validators)})
def meta_schema(cls, for_update=False):
d = {
voluptuous.Required('type'): cls.typename,
voluptuous.Required('required', default=True): bool
}
if for_update:
d[voluptuous.Required('options', default={})] = OperationOptions
if callable(cls.meta_schema_ext):
d.update(cls.meta_schema_ext())
else:
d.update(cls.meta_schema_ext)
return d
def patch(self):
ap = pecan.request.indexer.get_archive_policy(self.archive_policy)
if not ap:
abort(404, six.text_type(
indexer.NoSuchArchivePolicy(self.archive_policy)))
enforce("update archive policy", ap)
body = deserialize_and_validate(voluptuous.Schema({
voluptuous.Required("definition"):
voluptuous.All([{
"granularity": Timespan,
"points": PositiveNotNullInt,
"timespan": Timespan}], voluptuous.Length(min=1)),
}))
# Validate the data
try:
ap_items = [archive_policy.ArchivePolicyItem(**item) for item in
body['definition']]
except ValueError as e:
abort(400, six.text_type(e))
try:
return pecan.request.indexer.update_archive_policy(
self.archive_policy, ap_items)
except indexer.UnsupportedArchivePolicyChange as e:
abort(400, six.text_type(e))
def post(self):
enforce("create archive policy", {})
# NOTE(jd): Initialize this one at run-time because we rely on conf
conf = pecan.request.conf
valid_agg_methods = (
archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS_VALUES
)
ArchivePolicySchema = voluptuous.Schema({
voluptuous.Required("name"): six.text_type,
voluptuous.Required("back_window", default=0): PositiveOrNullInt,
voluptuous.Required(
"aggregation_methods",
default=set(conf.archive_policy.default_aggregation_methods)):
voluptuous.All(list(valid_agg_methods), voluptuous.Coerce(set)),
voluptuous.Required("definition"):
voluptuous.All([{
"granularity": Timespan,
"points": PositiveNotNullInt,
"timespan": Timespan,
}], voluptuous.Length(min=1)),
})
body = deserialize_and_validate(ArchivePolicySchema)
# Validate the data
try:
ap = archive_policy.ArchivePolicy.from_dict(body)
except ValueError as e:
abort(400, six.text_type(e))
enforce("create archive policy", ap)
try:
ap = pecan.request.indexer.create_archive_policy(ap)
except indexer.ArchivePolicyAlreadyExists as e:
abort(409, six.text_type(e))
location = "/archive_policy/" + ap.name
set_resp_location_hdr(location)
pecan.response.status = 201
return ap
def patch(self):
ArchivePolicyRuleSchema = voluptuous.Schema({
voluptuous.Required("name"): six.text_type,
})
body = deserialize_and_validate(ArchivePolicyRuleSchema)
enforce("update archive policy rule", {})
try:
return pecan.request.indexer.update_archive_policy_rule(
self.archive_policy_rule.name, body["name"])
except indexer.UnsupportedArchivePolicyRuleChange as e:
abort(400, six.text_type(e))
def parse_option(option_string):
if 'OPTION_NO_VALUE' in option_string:
option = re.findall(r'\"(.*?)\"', option_string)[0]
# The options without values seem to still need a value
# when used with pilight-daemon, but this are not mandatory
# options
# E.G.: option 'on' is 'on': 1
return {vol.Optional(option): vol.Coerce(int)}
elif 'OPTION_HAS_VALUE' in option_string:
options = re.findall(r'\"(.*?)\"', option_string)
option = options[0]
regex = None
if len(options) > 1: # Option has specified value by regex
regex = options[1]
if 'JSON_NUMBER' in option_string:
return {vol.Required(option): vol.Coerce(int)}
elif 'JSON_STRING' in option_string:
return {vol.Required(option): vol.Coerce(str)}
else:
raise
elif 'OPTION_OPT_VALUE' in option_string:
options = re.findall(r'\"(.*?)\"', option_string)
option = options[0]
regex = None
if len(options) > 1: # Option has specified value by regex
regex = options[1]
if 'JSON_NUMBER' in option_string:
return {vol.Required(option): vol.Coerce(int)}
elif 'JSON_STRING' in option_string:
return {vol.Required(option): vol.Coerce(str)}
else:
raise
else:
print(option_string)
raise
raise
def test_protectron_kwargs_input_schema_with_default():
@protectron(Schema({Required('a', default=42): int, 'b': int}))
def foo(a, b):
return a
assert foo(b=42) == 42
def subvoat_name(self, subvoat_name):
schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})
return self.try_schema('subvoat_name', subvoat_name, schema)
def uuid(self, uuid):
schema = Schema({ Required('uuid'): All(str, Length(min=36, max=36))})
return self.try_schema('uuid', uuid, schema)
def comment_body(self, comment_body):
schema = Schema({ Required('comment_body'): All(str, Length(min=self.config['min_length_comment_body'],
max=self.config['max_length_comment_body']))})
return self.try_schema('comment_body', comment_body, schema)
def thread(self, subvoat_name, title, body):
title_schema = Schema({ Required('title'): All(str, Length(min=self.config['min_length_thread_title'],
max=self.config['max_length_thread_title']))})
body_schema = Schema({ Required('body'): All(str, Length(min=self.config['min_length_thread_body'],
max=self.config['max_length_thread_body']))})
# validate the subvoat_name first
sn_status, sn_result = self.subvoat_name(subvoat_name)
if not sn_status:
return [sn_status, sn_result]
# Then validate the thread title
t_status, t_result = self.try_schema('title', title, title_schema)
if not t_status:
return [t_status, t_result]
# and thread body
b_status, b_result = self.try_schema('body', body, body_schema)
if not b_status:
return [b_status, b_result]
# return True if everything is ok
return [True, None]
def username(self, username):
schema = Schema({ Required('username'): All(str, Length(min=self.config['min_length_username'], max=self.config['max_length_username']))})
return self.try_schema('username', username, schema)
def password(self, password):
schema = Schema({ Required('password'): All(str, Length(min=self.config['min_length_password'], max=self.config['max_length_password']))})
return self.try_schema('password', password, schema)
def api_token(self, api_token):
# FIX: make this actually check the token type
schema = Schema({ Required('api_token'): All(str, Length(min=36, max=36))})
return self.try_schema('api_token', api_token, schema)
def __init__(self):
self.schema = voluptuous.Schema({
'name': six.text_type,
'flavor': six.text_type,
'os': {
voluptuous.Required('distro'): six.text_type,
voluptuous.Required('version'): six.text_type,
voluptuous.Required('os_type'): six.text_type,
},
'metadata': dict,
'start_date': voluptuous.Datetime(),
'end_date': voluptuous.Datetime(),
})
def async_load_config(path: str, hass: HomeAssistantType,
consider_home: timedelta):
"""Load devices from YAML configuration file.
This method is a coroutine.
"""
dev_schema = vol.Schema({
vol.Required('name'): cv.string,
vol.Optional('track', default=False): cv.boolean,
vol.Optional('mac', default=None): vol.Any(None, vol.All(cv.string,
vol.Upper)),
vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean,
vol.Optional('gravatar', default=None): vol.Any(None, cv.string),
vol.Optional('picture', default=None): vol.Any(None, cv.string),
vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All(
cv.time_period, cv.positive_timedelta),
vol.Optional('vendor', default=None): vol.Any(None, cv.string),
})
try:
result = []
try:
devices = yield from hass.loop.run_in_executor(
None, load_yaml_config_file, path)
except HomeAssistantError as err:
_LOGGER.error('Unable to load %s: %s', path, str(err))
return []
for dev_id, device in devices.items():
try:
device = dev_schema(device)
device['dev_id'] = cv.slugify(dev_id)
except vol.Invalid as exp:
async_log_exception(exp, dev_id, devices, hass)
else:
result.append(Device(hass, **device))
return result
except (HomeAssistantError, FileNotFoundError):
# When YAML file could not be loaded/did not contain a dict
return []