def validate(self):
supplier = Schema({
Required('ruc'): All(str, Length(min=11, max=11)),
Required('registration_name'): str,
Optional('address'): dict,
Optional('commercial_name'): str
}, extra=ALLOW_EXTRA)
customer = Schema({
Required('ruc'): All(str, Length(min=1, max=11))
}, extra=ALLOW_EXTRA)
schema = Schema({
Required('issue_date'): str,
Required('supplier'): supplier,
Required('customer'): customer,
Required('voucher_type'): str,
Required('currency'): str,
Required('voucher_number'): str,
Required('lines'): All(list, Length(min=1))
}, extra=ALLOW_EXTRA)
schema(self._data)
python类All()的实例源码
def get_pagination_options(params, default):
try:
opts = voluptuous.Schema({
voluptuous.Required(
"limit", default=pecan.request.conf.api.max_limit):
voluptuous.All(voluptuous.Coerce(int),
voluptuous.Range(min=1),
voluptuous.Clamp(
min=1, max=pecan.request.conf.api.max_limit)),
"marker": six.text_type,
voluptuous.Required("sort", default=default):
voluptuous.All(
voluptuous.Coerce(arg_to_list),
[six.text_type]),
}, extra=voluptuous.REMOVE_EXTRA)(params)
except voluptuous.Invalid as e:
abort(400, {"cause": "Argument value error",
"reason": str(e)})
opts['sorts'] = opts['sort']
del opts['sort']
return opts
def get(self):
parser = reqparse.RequestParser()
return_data = []
parser.add_argument('subvoat_name')
args = parser.parse_args()
schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})
try:
schema({'subvoat_name':args.get('subvoat_name')})
except MultipleInvalid as e:
return {'error':'%s %s' % (e.msg, e.path)}
posts = self.subvoat_utils.get_posts(args['subvoat_name'])
for p in posts:
return_data.append(p)
return {'result':return_data}
def page(self, start, end):
s_schema = Schema({ Required('start'): All(int, Range(min=0))})
e_schema = Schema({ Required('end'): All(int, Range(min=0))})
s_status, s_result = self.try_schema('start', start, s_schema)
if not s_status:
return [s_status, s_result]
e_status, e_result = self.try_schema('end', end, e_schema)
if not e_status:
return [e_status, e_result]
if end < start:
return [False, 'ending page cannot be lower than starting page']
total_pages = end - start
if total_pages > 50:
return [False, 'you cannot request more than 50 pages']
return [True, None]
def __init__(self, server):
self.server = server
self.guild_man = server.guild_man
self.channel_edit_base = Schema({
'name': All(str, Length(min=2, max=100)),
'position': int,
Optional('nsfw'): bool,
}, required=True)
self.textchan_editschema = self.channel_edit_base.extend({
'topic': All(str, Length(min=0, max=1024))
})
self.voicechan_editschema = self.channel_edit_base.extend({
'bitrate': All(int, Range(min=8000, max=96000)),
'user_limit': All(int, Range(min=0, max=99)),
})
self.register()
def enum(enumClass):
"""Create validator for specified enum."""
return vol.All(vol.In(enumClass.__members__), enumClass.__getitem__)
def state_variables(self):
"""Get All UpnpStateVariables for this UpnpService."""
return self._state_variables
def actions(self):
"""Get All UpnpActions for this UpnpService."""
return self._actions
def _state_variable_create_schema(self, type_info):
# construct validators
validators = []
data_type = type_info['data_type_python']
validators.append(data_type)
if 'allowed_values' in type_info:
allowed_values = type_info['allowed_values']
in_ = vol.In(allowed_values) # coerce allowed values? assume always string for now
validators.append(in_)
if 'allowed_value_range' in type_info:
min_ = type_info['allowed_value_range'].get('min', None)
max_ = type_info['allowed_value_range'].get('max', None)
min_ = data_type(min_)
max_ = data_type(max_)
range_ = vol.Range(min=min_, max=max_)
validators.append(range_)
# construct key
key = vol.Required('value')
if 'default_value' in type_info:
default_value = type_info['default_value']
if data_type == bool:
default_value = default_value == '1'
else:
default_value = data_type(default_value)
key.default = default_value
return vol.Schema({key: vol.All(*validators)})
def _add_validation(context):
validation_utils = dbt.utils.AttrDict({
'any': voluptuous.Any,
'all': voluptuous.All,
})
return dbt.utils.merge(
context,
{'validation': validation_utils})
def schema_ext(self):
return voluptuous.All(six.text_type,
voluptuous.Length(
min=self.min_length,
max=self.max_length))
def schema_ext(self):
return voluptuous.All(numbers.Real,
voluptuous.Range(min=self.min,
max=self.max))
def MetricSchema(v):
"""metric keyword schema
It could be:
["metric", "metric-ref", "aggregation"]
or
["metric, ["metric-ref", "aggregation"], ["metric-ref", "aggregation"]]
"""
if not isinstance(v, (list, tuple)):
raise voluptuous.Invalid("Expected a tuple/list, got a %s" % type(v))
elif not v:
raise voluptuous.Invalid("Operation must not be empty")
elif len(v) < 2:
raise voluptuous.Invalid("Operation need at least one argument")
elif v[0] != u"metric":
# NOTE(sileht): this error message doesn't looks related to "metric",
# but because that the last schema validated by voluptuous, we have
# good chance (voluptuous.Any is not predictable) to print this
# message even if it's an other operation that invalid.
raise voluptuous.Invalid("'%s' operation invalid" % v[0])
return [u"metric"] + voluptuous.Schema(voluptuous.Any(
voluptuous.ExactSequence([six.text_type, six.text_type]),
voluptuous.All(
voluptuous.Length(min=1),
[voluptuous.ExactSequence([six.text_type, six.text_type])],
)), required=True)(v[1:])
def patch(self):
ap = pecan.request.indexer.get_archive_policy(self.archive_policy)
if not ap:
abort(404, six.text_type(
indexer.NoSuchArchivePolicy(self.archive_policy)))
enforce("update archive policy", ap)
body = deserialize_and_validate(voluptuous.Schema({
voluptuous.Required("definition"):
voluptuous.All([{
"granularity": Timespan,
"points": PositiveNotNullInt,
"timespan": Timespan}], voluptuous.Length(min=1)),
}))
# Validate the data
try:
ap_items = [archive_policy.ArchivePolicyItem(**item) for item in
body['definition']]
except ValueError as e:
abort(400, six.text_type(e))
try:
return pecan.request.indexer.update_archive_policy(
self.archive_policy, ap_items)
except indexer.UnsupportedArchivePolicyChange as e:
abort(400, six.text_type(e))
def subvoat_name(self, subvoat_name):
schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})
return self.try_schema('subvoat_name', subvoat_name, schema)
def uuid(self, uuid):
schema = Schema({ Required('uuid'): All(str, Length(min=36, max=36))})
return self.try_schema('uuid', uuid, schema)
def comment_body(self, comment_body):
schema = Schema({ Required('comment_body'): All(str, Length(min=self.config['min_length_comment_body'],
max=self.config['max_length_comment_body']))})
return self.try_schema('comment_body', comment_body, schema)
def thread(self, subvoat_name, title, body):
title_schema = Schema({ Required('title'): All(str, Length(min=self.config['min_length_thread_title'],
max=self.config['max_length_thread_title']))})
body_schema = Schema({ Required('body'): All(str, Length(min=self.config['min_length_thread_body'],
max=self.config['max_length_thread_body']))})
# validate the subvoat_name first
sn_status, sn_result = self.subvoat_name(subvoat_name)
if not sn_status:
return [sn_status, sn_result]
# Then validate the thread title
t_status, t_result = self.try_schema('title', title, title_schema)
if not t_status:
return [t_status, t_result]
# and thread body
b_status, b_result = self.try_schema('body', body, body_schema)
if not b_status:
return [b_status, b_result]
# return True if everything is ok
return [True, None]
def username(self, username):
schema = Schema({ Required('username'): All(str, Length(min=self.config['min_length_username'], max=self.config['max_length_username']))})
return self.try_schema('username', username, schema)
def password(self, password):
schema = Schema({ Required('password'): All(str, Length(min=self.config['min_length_password'], max=self.config['max_length_password']))})
return self.try_schema('password', password, schema)
def api_token(self, api_token):
# FIX: make this actually check the token type
schema = Schema({ Required('api_token'): All(str, Length(min=36, max=36))})
return self.try_schema('api_token', api_token, schema)
def get(self, request):
"""Handle a GET request."""
hass = request.app['hass']
data = request.GET
if 'code' not in data or 'state' not in data:
return self.json_message('Authentication failed, not the right '
'variables, try again.', HTTP_BAD_REQUEST)
self.lyric.authorization_code(code=data['code'], state=data['state'])
return self.json_message('OK! All good. Got the respons! You can close'
'this window now, and click << Continue >>'
'in the configurator.')
def __init__(self, server):
self.server = server
self.guild_man = server.guild_man
_o = Optional
self.guild_edit_schema = Schema({
_o('name'): str,
_o('region'): str,
_o('verification_level'): int,
_o('default_message_notifications'): int,
_o('afk_channel_id'): str,
_o('afk_timeout'): int,
_o('icon'): str,
_o('owner_id'): str,
}, required=True, extra=REMOVE_EXTRA)
self.guild_create_schema = Schema({
'name': str,
'region': str,
'icon': Any(None, str),
'verification_level': int,
'default_message_notifications': int,
}, extra=REMOVE_EXTRA)
self.channel_create_schema = Schema({
'name': All(str, Length(min=2, max=100)),
_o('type'): int,
_o('bitrate'): int,
_o('user_limit'): int,
_o('permission_overwrites'): list,
}, required=True, extra=REMOVE_EXTRA)
self.register()
def __init__(self, server):
self.server = server
self.guild_man = server.guild_man
self.invite_create_schema = Schema({
Required('max_age', default=86400): All(int, Range(min=10, max=86400)),
Required('max_uses', default=0): All(int, Range(min=0, max=50)),
Required('temporary', default=False): bool,
Required('unique', default=True): bool,
}, extra=REMOVE_EXTRA)
self.register(server.app)
def enum(enumClass):
"""Create validator for specified enum."""
return vol.All(vol.In(enumClass.__members__), enumClass.__getitem__)
def async_load_config(path: str, hass: HomeAssistantType,
consider_home: timedelta):
"""Load devices from YAML configuration file.
This method is a coroutine.
"""
dev_schema = vol.Schema({
vol.Required('name'): cv.string,
vol.Optional('track', default=False): cv.boolean,
vol.Optional('mac', default=None): vol.Any(None, vol.All(cv.string,
vol.Upper)),
vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean,
vol.Optional('gravatar', default=None): vol.Any(None, cv.string),
vol.Optional('picture', default=None): vol.Any(None, cv.string),
vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All(
cv.time_period, cv.positive_timedelta),
vol.Optional('vendor', default=None): vol.Any(None, cv.string),
})
try:
result = []
try:
devices = yield from hass.loop.run_in_executor(
None, load_yaml_config_file, path)
except HomeAssistantError as err:
_LOGGER.error('Unable to load %s: %s', path, str(err))
return []
for dev_id, device in devices.items():
try:
device = dev_schema(device)
device['dev_id'] = cv.slugify(dev_id)
except vol.Invalid as exp:
async_log_exception(exp, dev_id, devices, hass)
else:
result.append(Device(hass, **device))
return result
except (HomeAssistantError, FileNotFoundError):
# When YAML file could not be loaded/did not contain a dict
return []
def MetricSchema(definition):
creator = pecan.request.auth_helper.get_current_user(
pecan.request)
# First basic validation
schema = voluptuous.Schema({
"archive_policy_name": six.text_type,
"resource_id": functools.partial(ResourceID, creator=creator),
"name": six.text_type,
voluptuous.Optional("unit"):
voluptuous.All(six.text_type, voluptuous.Length(max=31)),
})
definition = schema(definition)
archive_policy_name = definition.get('archive_policy_name')
name = definition.get('name')
if name and '/' in name:
abort(400, "'/' is not supported in metric name")
if archive_policy_name is None:
try:
ap = pecan.request.indexer.get_archive_policy_for_metric(name)
except indexer.NoArchivePolicyRuleMatch:
# NOTE(jd) Since this is a schema-like function, we
# should/could raise ValueError, but if we do so, voluptuous
# just returns a "invalid value" with no useful message – so we
# prefer to use abort() to make sure the user has the right
# error message
abort(400, "No archive policy name specified "
"and no archive policy rule found matching "
"the metric name %s" % name)
else:
definition['archive_policy_name'] = ap.name
resource_id = definition.get('resource_id')
if resource_id is None:
original_resource_id = None
else:
if name is None:
abort(400,
{"cause": "Attribute value error",
"detail": "name",
"reason": "Name cannot be null "
"if resource_id is not null"})
original_resource_id, resource_id = resource_id
enforce("create metric", {
"creator": creator,
"archive_policy_name": archive_policy_name,
"resource_id": resource_id,
"original_resource_id": original_resource_id,
"name": name,
"unit": definition.get('unit'),
})
return definition
def _ResourceSearchSchema():
user = pecan.request.auth_helper.get_current_user(
pecan.request)
_ResourceUUID = functools.partial(ResourceUUID, creator=user)
return voluptuous.Schema(
voluptuous.All(
voluptuous.Length(min=0, max=1),
{
voluptuous.Any(
u"=", u"==", u"eq",
u"<", u"lt",
u">", u"gt",
u"<=", u"?", u"le",
u">=", u"?", u"ge",
u"!=", u"?", u"ne",
): voluptuous.All(
voluptuous.Length(min=1, max=1),
{"id": _ResourceUUID,
NotIDKey: ResourceSearchSchemaAttributeValue},
),
u"like": voluptuous.All(
voluptuous.Length(min=1, max=1),
{NotIDKey: ResourceSearchSchemaAttributeValue},
),
u"in": voluptuous.All(
voluptuous.Length(min=1, max=1),
{"id": voluptuous.All(
[_ResourceUUID],
voluptuous.Length(min=1)),
NotIDKey: voluptuous.All(
[ResourceSearchSchemaAttributeValue],
voluptuous.Length(min=1))}
),
voluptuous.Any(
u"and", u"?",
u"or", u"?",
): voluptuous.All(
[ResourceSearchSchema], voluptuous.Length(min=1)
),
u"not": ResourceSearchSchema,
}
)
)
def async_run(self, variables: Optional[Sequence]=None) -> None:
"""Run script.
This method is a coroutine.
"""
if self._cur == -1:
self._log('Running script')
self._cur = 0
# Unregister callback if we were in a delay but turn on is called
# again. In that case we just continue execution.
self._async_remove_listener()
for cur, action in islice(enumerate(self.sequence), self._cur,
None):
if CONF_DELAY in action:
# Call ourselves in the future to continue work
@asyncio.coroutine
def script_delay(now):
"""Called after delay is done."""
self._async_unsub_delay_listener = None
self.hass.async_add_job(self.async_run(variables))
delay = action[CONF_DELAY]
if isinstance(delay, template.Template):
delay = vol.All(
cv.time_period,
cv.positive_timedelta)(
delay.async_render(variables))
self._async_unsub_delay_listener = \
async_track_point_in_utc_time(
self.hass, script_delay,
date_util.utcnow() + delay)
self._cur = cur + 1
if self._change_listener:
self.hass.async_add_job(self._change_listener)
return
elif CONF_CONDITION in action:
if not self._async_check_condition(action, variables):
break
elif CONF_EVENT in action:
self._async_fire_event(action)
else:
yield from self._async_call_service(action, variables)
self._cur = -1
self.last_action = None
if self._change_listener:
self.hass.async_add_job(self._change_listener)