def __init__(self):
self.redisinstance = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0)
self.validateschema = Schema({
Required(RegistryKeys.SERVICE_ID): Any(str, unicode),
Required(RegistryKeys.SERVICE_NAME): Any(str, unicode),
Required(RegistryKeys.SERVICE_PORT): int,
Required(RegistryKeys.NAMESPACE, default='default'): Any(str, unicode),
Required(RegistryKeys.ENDPOINTS): [{
Required(RegistryKeys.URI): Any(str, unicode),
Required(RegistryKeys.ACCEPTANCE_REGEX): Any(str, unicode),
RegistryKeys.FILTER_REGEX: {
Required(RegistryKeys.PATTERN): Any(str, unicode),
Required(RegistryKeys.REPLACE): Any(str, unicode)
}
}]
})
python类Any()的实例源码
def __init__(self, *args, **kwargs):
super(ResourceTypeSchemaManager, self).__init__(*args, **kwargs)
type_schemas = tuple([ext.plugin.meta_schema()
for ext in self.extensions])
self._schema = voluptuous.Schema({
"name": six.text_type,
voluptuous.Required("attributes", default={}): {
six.text_type: voluptuous.Any(*tuple(type_schemas))
}
})
type_schemas = tuple([ext.plugin.meta_schema(for_update=True)
for ext in self.extensions])
self._schema_for_update = voluptuous.Schema({
"name": six.text_type,
voluptuous.Required("attributes", default={}): {
six.text_type: voluptuous.Any(*tuple(type_schemas))
}
})
def load_tcconfig(self, config_file_path):
import json
from voluptuous import Schema, Required, Any, ALLOW_EXTRA
schema = Schema({
Required(six.text_type): {
Any(*TrafficDirection.LIST): {
six.text_type: {
six.text_type: Any(six.text_type, int, float)
},
}
},
}, extra=ALLOW_EXTRA)
with open(config_file_path) as fp:
self.__config_table = json.load(fp)
schema(self.__config_table)
self.__logger.debug("tc config file: {:s}".format(
json.dumps(self.__config_table, indent=4)))
def boolean(value: Any) -> bool:
"""Validate and coerce a boolean value."""
if isinstance(value, str):
value = value.lower()
if value in ('1', 'true', 'yes', 'on', 'enable'):
return True
if value in ('0', 'false', 'no', 'off', 'disable'):
return False
raise vol.Invalid('invalid boolean value {}'.format(value))
return bool(value)
def isfile(value: Any) -> str:
"""Validate that the value is an existing file."""
if value is None:
raise vol.Invalid('None is not file')
file_in = os.path.expanduser(str(value))
if not os.path.isfile(file_in):
raise vol.Invalid('not a file')
if not os.access(file_in, os.R_OK):
raise vol.Invalid('file not readable')
return file_in
def entity_id(value: Any) -> str:
"""Validate Entity ID."""
value = string(value).lower()
if valid_entity_id(value):
return value
raise vol.Invalid('Entity ID {} is an invalid entity id'.format(value))
def string(value: Any) -> str:
"""Coerce value to string, except for None."""
if value is not None:
return str(value)
raise vol.Invalid('string value is None')
def url(value: Any) -> str:
"""Validate an URL."""
url_in = str(value)
if urlparse(url_in).scheme in ['http', 'https']:
return vol.Schema(vol.Url())(url_in)
raise vol.Invalid('invalid url')
def _add_validation(context):
validation_utils = dbt.utils.AttrDict({
'any': voluptuous.Any,
'all': voluptuous.All,
})
return dbt.utils.merge(
context,
{'validation': validation_utils})
def MetricSchema(v):
"""metric keyword schema
It could be:
["metric", "metric-ref", "aggregation"]
or
["metric, ["metric-ref", "aggregation"], ["metric-ref", "aggregation"]]
"""
if not isinstance(v, (list, tuple)):
raise voluptuous.Invalid("Expected a tuple/list, got a %s" % type(v))
elif not v:
raise voluptuous.Invalid("Operation must not be empty")
elif len(v) < 2:
raise voluptuous.Invalid("Operation need at least one argument")
elif v[0] != u"metric":
# NOTE(sileht): this error message doesn't looks related to "metric",
# but because that the last schema validated by voluptuous, we have
# good chance (voluptuous.Any is not predictable) to print this
# message even if it's an other operation that invalid.
raise voluptuous.Invalid("'%s' operation invalid" % v[0])
return [u"metric"] + voluptuous.Schema(voluptuous.Any(
voluptuous.ExactSequence([six.text_type, six.text_type]),
voluptuous.All(
voluptuous.Length(min=1),
[voluptuous.ExactSequence([six.text_type, six.text_type])],
)), required=True)(v[1:])
def OperationsSchema(v):
if isinstance(v, six.text_type):
try:
v = pyparsing.OneOrMore(
pyparsing.nestedExpr()).parseString(v).asList()[0]
except pyparsing.ParseException as e:
api.abort(400, {"cause": "Invalid operations",
"reason": "Fail to parse the operations string",
"detail": six.text_type(e)})
return voluptuous.Schema(voluptuous.Any(*OperationsSchemaBase),
required=True)(v)
def ResourceSchema(schema):
base_schema = {
voluptuous.Optional('started_at'): utils.to_datetime,
voluptuous.Optional('ended_at'): utils.to_datetime,
voluptuous.Optional('user_id'): voluptuous.Any(None, six.text_type),
voluptuous.Optional('project_id'): voluptuous.Any(None, six.text_type),
voluptuous.Optional('metrics'): MetricsSchema,
}
base_schema.update(schema)
return base_schema
def __init__(self, server):
self.server = server
self.guild_man = server.guild_man
_o = Optional
self.guild_edit_schema = Schema({
_o('name'): str,
_o('region'): str,
_o('verification_level'): int,
_o('default_message_notifications'): int,
_o('afk_channel_id'): str,
_o('afk_timeout'): int,
_o('icon'): str,
_o('owner_id'): str,
}, required=True, extra=REMOVE_EXTRA)
self.guild_create_schema = Schema({
'name': str,
'region': str,
'icon': Any(None, str),
'verification_level': int,
'default_message_notifications': int,
}, extra=REMOVE_EXTRA)
self.channel_create_schema = Schema({
'name': All(str, Length(min=2, max=100)),
_o('type'): int,
_o('bitrate'): int,
_o('user_limit'): int,
_o('permission_overwrites'): list,
}, required=True, extra=REMOVE_EXTRA)
self.register()
def boolean(value: Any) -> bool:
"""Validate and coerce a boolean value."""
if isinstance(value, str):
value = value.lower()
if value in ('1', 'true', 'yes', 'on', 'enable'):
return True
if value in ('0', 'false', 'no', 'off', 'disable'):
return False
raise vol.Invalid('invalid boolean value {}'.format(value))
return bool(value)
def isfile(value: Any) -> str:
"""Validate that the value is an existing file."""
if value is None:
raise vol.Invalid('None is not file')
file_in = os.path.expanduser(str(value))
if not os.path.isfile(file_in):
raise vol.Invalid('not a file')
if not os.access(file_in, os.R_OK):
raise vol.Invalid('file not readable')
return file_in
def entity_id(value: Any) -> str:
"""Validate Entity ID."""
value = string(value).lower()
if valid_entity_id(value):
return value
raise vol.Invalid('Entity ID {} is an invalid entity id'.format(value))
def string(value: Any) -> str:
"""Coerce value to string, except for None."""
if value is not None:
return str(value)
raise vol.Invalid('string value is None')
def url(value: Any) -> str:
"""Validate an URL."""
url_in = str(value)
if urlparse(url_in).scheme in ['http', 'https']:
return vol.Schema(vol.Url())(url_in)
raise vol.Invalid('invalid url')
def async_setup_scanner_platform(hass: HomeAssistantType, config: ConfigType,
scanner: Any, async_see_device: Callable):
"""Helper method to connect scanner-based platform to device tracker.
This method is a coroutine.
"""
interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
# Initial scan of each mac we also tell about host name for config
seen = set() # type: Any
def device_tracker_scan(now: dt_util.dt.datetime):
"""Called when interval matches."""
found_devices = scanner.scan_devices()
for mac in found_devices:
if mac in seen:
host_name = None
else:
host_name = scanner.get_device_name(mac)
seen.add(mac)
hass.add_job(async_see_device(mac=mac, host_name=host_name))
async_track_utc_time_change(
hass, device_tracker_scan, second=range(0, 60, interval))
hass.async_add_job(device_tracker_scan, None)
def _ResourceSearchSchema():
user = pecan.request.auth_helper.get_current_user(
pecan.request)
_ResourceUUID = functools.partial(ResourceUUID, creator=user)
return voluptuous.Schema(
voluptuous.All(
voluptuous.Length(min=0, max=1),
{
voluptuous.Any(
u"=", u"==", u"eq",
u"<", u"lt",
u">", u"gt",
u"<=", u"?", u"le",
u">=", u"?", u"ge",
u"!=", u"?", u"ne",
): voluptuous.All(
voluptuous.Length(min=1, max=1),
{"id": _ResourceUUID,
NotIDKey: ResourceSearchSchemaAttributeValue},
),
u"like": voluptuous.All(
voluptuous.Length(min=1, max=1),
{NotIDKey: ResourceSearchSchemaAttributeValue},
),
u"in": voluptuous.All(
voluptuous.Length(min=1, max=1),
{"id": voluptuous.All(
[_ResourceUUID],
voluptuous.Length(min=1)),
NotIDKey: voluptuous.All(
[ResourceSearchSchemaAttributeValue],
voluptuous.Length(min=1))}
),
voluptuous.Any(
u"and", u"?",
u"or", u"?",
): voluptuous.All(
[ResourceSearchSchema], voluptuous.Length(min=1)
),
u"not": ResourceSearchSchema,
}
)
)