def setUp(self):
spy = unittest.mock.Mock()
class MyClient(Client):
attr_name = 'my_clients'
@staticmethod
def validator():
return {
voluptuous.Required('foo'): int,
voluptuous.Optional('bar', default='def bar'): str
}
def __init__(self, **kwargs):
spy(kwargs)
def close(self):
spy('close')
self.spy = spy
MetaServerTestState.bind_class_client(MyClient)
self.addCleanup(lambda: MetaServerTestState.forget_client(MyClient))
python类Optional()的实例源码
def _build_attributes_validator(mcs):
"""
Returns validator to validate the sub-classes attributes.
"""
valid_attributes = {
Required("commands", 'required class attribute'): [
{
Required("name"): str,
Required("cmd"): [str],
Optional("kill-signal", default=signal.SIGINT): int
}
]
}
for attr_name, class_client in mcs._class_clients.items():
client_validator = {
Required("name"): str,
}
client_validator.update(class_client.validator())
key = Optional(attr_name, 'required class attribute')
valid_attributes[key] = [client_validator]
return Schema(valid_attributes, extra=ALLOW_EXTRA)
def _get_sensor_state(self, entity_id, remote_states=None):
def _opt_float(x: str) -> Optional[float]:
try:
return float(x)
except ValueError:
return None
value = None
if remote_states is not None:
value = [_opt_float(s.state) for s in
filter(lambda x: x.entity_id == entity_id,
remote_states)]
if value:
value = value[0]
else:
sensor = self.hass.states.get(entity_id)
if sensor is not None:
value = _opt_float(sensor.state)
return value
def run_information(point_in_time: Optional[datetime]=None):
"""Return information about current run.
There is also the run that covers point_in_time.
"""
_verify_instance()
recorder_runs = get_model('RecorderRuns')
if point_in_time is None or point_in_time > _INSTANCE.recording_start:
return recorder_runs(
end=None,
start=_INSTANCE.recording_start,
closed_incorrect=False)
return query('RecorderRuns').filter(
(recorder_runs.start < point_in_time) &
(recorder_runs.end > point_in_time)).first()
def __init__(self, data):
"""
Initialize with preparsed data object.
:param ConfigParser data: config file accessor
"""
self._data = data or {}
# self.latitude = None # type: Optional[float]
# self.longitude = None # type: Optional[float]
# self.elevation = None # type: Optional[int]
# self.location_name = None # type: Optional[str]
# self.time_zone = None # type: Optional[str]
# self.units = METRIC_SYSTEM # type: UnitSystem
# # If True, pip install is skipped for requirements on startup
# self.skip_pip = False # type: bool
# # List of loaded components
# self.components = set()
# # Remote.API object pointing at local API
# self.api = None
# # Directory that holds the configuration
# self.config_dir = None
self._scarlett_name = None
self._coordinates = None
self._longitude = None
self._latitude = None
self._pocketsphinx = None
self._elevation = None
self._unit_system = None
self._time_zone = None
self._owner_name = None
self._keyword_list = None
self._features_enabled = None
# self._units = METRIC_SYSTEM # type: UnitSystem
def opt(key):
"""Create an optional key that returns a default of None."""
return vol.Optional(key, default=None)
def validate_config(_config):
iptables_sql_schema = voluptuous.Schema({
"module": voluptuous.And(basestring, vu.NoSpaceCharacter()),
voluptuous.Optional("db_name"): voluptuous.And(
basestring, vu.NoSpaceCharacter()),
}, required=True)
return iptables_sql_schema(_config)
def validate_config(_config):
source_schema = voluptuous.Schema({
"module": voluptuous.And(basestring, vu.NoSpaceCharacter()),
"params": {
"host": voluptuous.And(basestring, vu.NoSpaceCharacter()),
"port": int,
"model": {
"name": voluptuous.And(basestring, vu.NoSpaceCharacter()),
"params": {
"origin_types": voluptuous.And([
{
"origin_type": voluptuous.And(
basestring, vu.NoSpaceCharacter()),
"weight": voluptuous.And(
voluptuous.Or(int, float),
voluptuous.Range(
min=0, min_included=False)),
}
], vu.NotEmptyArray()),
voluptuous.Optional("key_causes"): dict
}
},
"alerts_per_burst": voluptuous.And(
int, voluptuous.Range(min=1)),
"idle_time_between_bursts": voluptuous.And(
voluptuous.Or(int, float),
voluptuous.Range(min=0, min_included=False))
}
}, required=True)
return source_schema(_config)
def validate_config(_config):
base_schema = voluptuous.Schema({
"module": voluptuous.And(
basestring, lambda i: not any(c.isspace() for c in i)),
voluptuous.Optional("db_name"): voluptuous.And(
basestring, lambda i: not any(c.isspace() for c in i)),
}, required=True)
return base_schema(_config)
def schema(self):
if self.required:
return {self.name: self.schema_ext}
else:
return {voluptuous.Optional(self.name): self.schema_ext}
def ResourceSchema(schema):
base_schema = {
voluptuous.Optional('started_at'): utils.to_datetime,
voluptuous.Optional('ended_at'): utils.to_datetime,
voluptuous.Optional('user_id'): voluptuous.Any(None, six.text_type),
voluptuous.Optional('project_id'): voluptuous.Any(None, six.text_type),
voluptuous.Optional('metrics'): MetricsSchema,
}
base_schema.update(schema)
return base_schema
def parse_option(option_string):
if 'OPTION_NO_VALUE' in option_string:
option = re.findall(r'\"(.*?)\"', option_string)[0]
# The options without values seem to still need a value
# when used with pilight-daemon, but this are not mandatory
# options
# E.G.: option 'on' is 'on': 1
return {vol.Optional(option): vol.Coerce(int)}
elif 'OPTION_HAS_VALUE' in option_string:
options = re.findall(r'\"(.*?)\"', option_string)
option = options[0]
regex = None
if len(options) > 1: # Option has specified value by regex
regex = options[1]
if 'JSON_NUMBER' in option_string:
return {vol.Required(option): vol.Coerce(int)}
elif 'JSON_STRING' in option_string:
return {vol.Required(option): vol.Coerce(str)}
else:
raise
elif 'OPTION_OPT_VALUE' in option_string:
options = re.findall(r'\"(.*?)\"', option_string)
option = options[0]
regex = None
if len(options) > 1: # Option has specified value by regex
regex = options[1]
if 'JSON_NUMBER' in option_string:
return {vol.Required(option): vol.Coerce(int)}
elif 'JSON_STRING' in option_string:
return {vol.Required(option): vol.Coerce(str)}
else:
raise
else:
print(option_string)
raise
raise
def get_dbt_rh_points(self):
"""Extract temperature - humidity points from sensors."""
def _mean(values: Union[List[float],
Tuple[float]]) -> Optional[float]:
if values:
try:
return sum(values) / len(values)
except TypeError:
_LOGGER.error('Bad values in mean: %s', values)
return None
results = yield from self.collect_states()
points = {}
for key, value in results.items():
if isinstance(value, dict):
temp_zone = humid_zone = counter = 0
for k_room, s_values in value.items():
temp = _mean([v[0] for v in s_values])
humid = _mean([v[1] for v in s_values])
if temp is not None and humid is not None:
points[k_room] = (int(100 * temp) / 100.,
int(100 * humid) / 100.)
temp_zone += temp
humid_zone += humid
counter += 1
if counter:
points[key] = (int(100 * temp_zone / counter) / 100.,
int(100 * humid_zone / counter) / 100.)
else:
temp = _mean([v[0] for v in value])
humid = _mean([v[1] for v in value])
if temp is not None and humid is not None:
points[key] = (int(100 * temp) / 100.,
int(100 * humid) / 100.)
return points
def log_error(e: Exception, retry_wait: Optional[float]=0,
rollback: Optional[bool]=True,
message: Optional[str]="Error during query: %s") -> None:
"""Log about SQLAlchemy errors in a sane manner."""
import sqlalchemy.exc
if not isinstance(e, sqlalchemy.exc.OperationalError):
_LOGGER.exception(str(e))
else:
_LOGGER.error(message, str(e))
if rollback:
Session.rollback()
if retry_wait:
_LOGGER.info("Retrying in %s seconds", retry_wait)
time.sleep(retry_wait)
def async_load_config(path: str, hass: HomeAssistantType,
consider_home: timedelta):
"""Load devices from YAML configuration file.
This method is a coroutine.
"""
dev_schema = vol.Schema({
vol.Required('name'): cv.string,
vol.Optional('track', default=False): cv.boolean,
vol.Optional('mac', default=None): vol.Any(None, vol.All(cv.string,
vol.Upper)),
vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean,
vol.Optional('gravatar', default=None): vol.Any(None, cv.string),
vol.Optional('picture', default=None): vol.Any(None, cv.string),
vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All(
cv.time_period, cv.positive_timedelta),
vol.Optional('vendor', default=None): vol.Any(None, cv.string),
})
try:
result = []
try:
devices = yield from hass.loop.run_in_executor(
None, load_yaml_config_file, path)
except HomeAssistantError as err:
_LOGGER.error('Unable to load %s: %s', path, str(err))
return []
for dev_id, device in devices.items():
try:
device = dev_schema(device)
device['dev_id'] = cv.slugify(dev_id)
except vol.Invalid as exp:
async_log_exception(exp, dev_id, devices, hass)
else:
result.append(Device(hass, **device))
return result
except (HomeAssistantError, FileNotFoundError):
# When YAML file could not be loaded/did not contain a dict
return []
def _validate_schema(config):
"""Validate the configuration, with spark, up to the orchestration level
Checks that hte spark configuration is valid, as well as the modules
structure in the configuration up to the orchestration level.
Each module will be responsible to validate its own sub-configuration.
:type config: dict
:param config: configuration model for the whole system
:raises: SchemaError -- if the configuration, up to the
orchestration level, is not valid
"""
config_schema = voluptuous.Schema({
"spark_config": {
"appName": basestring,
"streaming": {
"batch_interval": voluptuous.And(int, voluptuous.Range(min=1))
}
},
"server": {
"port": int,
"debug": bool
},
"sources": {
voluptuous.Optional(basestring): {basestring: object}
},
"ingestors": {
voluptuous.Optional(basestring): {basestring: object}
},
"smls": {
voluptuous.Optional(basestring): {basestring: object}
},
"voters": {
voluptuous.Optional(basestring): {basestring: object}
},
"sinks": {
voluptuous.Optional(basestring): {basestring: object}
},
"ldps": {
voluptuous.Optional(basestring): {basestring: object}
},
"connections": {
voluptuous.Optional(basestring): [basestring]
},
"feedback": {
voluptuous.Optional(basestring): [basestring]
}
}, required=True)
return config_schema(config)
def MetricSchema(definition):
creator = pecan.request.auth_helper.get_current_user(
pecan.request)
# First basic validation
schema = voluptuous.Schema({
"archive_policy_name": six.text_type,
"resource_id": functools.partial(ResourceID, creator=creator),
"name": six.text_type,
voluptuous.Optional("unit"):
voluptuous.All(six.text_type, voluptuous.Length(max=31)),
})
definition = schema(definition)
archive_policy_name = definition.get('archive_policy_name')
name = definition.get('name')
if name and '/' in name:
abort(400, "'/' is not supported in metric name")
if archive_policy_name is None:
try:
ap = pecan.request.indexer.get_archive_policy_for_metric(name)
except indexer.NoArchivePolicyRuleMatch:
# NOTE(jd) Since this is a schema-like function, we
# should/could raise ValueError, but if we do so, voluptuous
# just returns a "invalid value" with no useful message – so we
# prefer to use abort() to make sure the user has the right
# error message
abort(400, "No archive policy name specified "
"and no archive policy rule found matching "
"the metric name %s" % name)
else:
definition['archive_policy_name'] = ap.name
resource_id = definition.get('resource_id')
if resource_id is None:
original_resource_id = None
else:
if name is None:
abort(400,
{"cause": "Attribute value error",
"detail": "name",
"reason": "Name cannot be null "
"if resource_id is not null"})
original_resource_id, resource_id = resource_id
enforce("create metric", {
"creator": creator,
"archive_policy_name": archive_policy_name,
"resource_id": resource_id,
"original_resource_id": original_resource_id,
"name": name,
"unit": definition.get('unit'),
})
return definition