python类Optional()的实例源码

test_cricri.py 文件源码 项目:cricri 作者: Maillol 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setUp(self):
        spy = unittest.mock.Mock()

        class MyClient(Client):
            attr_name = 'my_clients'

            @staticmethod
            def validator():
                return {
                    voluptuous.Required('foo'): int,
                    voluptuous.Optional('bar', default='def bar'): str
                }

            def __init__(self, **kwargs):
                spy(kwargs)

            def close(self):
                spy('close')

        self.spy = spy
        MetaServerTestState.bind_class_client(MyClient)
        self.addCleanup(lambda: MetaServerTestState.forget_client(MyClient))
cricri.py 文件源码 项目:cricri 作者: Maillol 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _build_attributes_validator(mcs):
        """
        Returns validator to validate the sub-classes attributes.
        """

        valid_attributes = {
            Required("commands", 'required class attribute'): [
                {
                    Required("name"): str,
                    Required("cmd"): [str],
                    Optional("kill-signal", default=signal.SIGINT): int
                }
            ]
        }

        for attr_name, class_client in mcs._class_clients.items():
            client_validator = {
                Required("name"): str,
            }
            client_validator.update(class_client.validator())

            key = Optional(attr_name, 'required class attribute')
            valid_attributes[key] = [client_validator]

        return Schema(valid_attributes, extra=ALLOW_EXTRA)
__init__.py 文件源码 项目:hass_config 作者: azogue 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _get_sensor_state(self, entity_id, remote_states=None):

        def _opt_float(x: str) -> Optional[float]:
            try:
                return float(x)
            except ValueError:
                return None

        value = None
        if remote_states is not None:
            value = [_opt_float(s.state) for s in
                     filter(lambda x: x.entity_id == entity_id,
                            remote_states)]
            if value:
                value = value[0]
        else:
            sensor = self.hass.states.get(entity_id)
            if sensor is not None:
                value = _opt_float(sensor.state)
        return value
__init__.py 文件源码 项目:homeassistant 作者: NAStools 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run_information(point_in_time: Optional[datetime]=None):
    """Return information about current run.

    There is also the run that covers point_in_time.
    """
    _verify_instance()

    recorder_runs = get_model('RecorderRuns')
    if point_in_time is None or point_in_time > _INSTANCE.recording_start:
        return recorder_runs(
            end=None,
            start=_INSTANCE.recording_start,
            closed_incorrect=False)

    return query('RecorderRuns').filter(
        (recorder_runs.start < point_in_time) &
        (recorder_runs.end > point_in_time)).first()
config.py 文件源码 项目:scarlett_os 作者: bossjones 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, data):
        """
        Initialize with preparsed data object.

        :param ConfigParser data: config file accessor
        """
        self._data = data or {}

        # self.latitude = None  # type: Optional[float]
        # self.longitude = None  # type: Optional[float]
        # self.elevation = None  # type: Optional[int]
        # self.location_name = None  # type: Optional[str]
        # self.time_zone = None  # type: Optional[str]
        # self.units = METRIC_SYSTEM  # type: UnitSystem

        # # If True, pip install is skipped for requirements on startup
        # self.skip_pip = False  # type: bool

        # # List of loaded components
        # self.components = set()

        # # Remote.API object pointing at local API
        # self.api = None

        # # Directory that holds the configuration
        # self.config_dir = None

        self._scarlett_name = None
        self._coordinates = None
        self._longitude = None
        self._latitude = None
        self._pocketsphinx = None
        self._elevation = None
        self._unit_system = None
        self._time_zone = None
        self._owner_name = None
        self._keyword_list = None
        self._features_enabled = None
        # self._units = METRIC_SYSTEM  # type: UnitSystem
validation.py 文件源码 项目:aioautomatic 作者: armills 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def opt(key):
    """Create an optional key that returns a default of None."""
    return vol.Optional(key, default=None)
iptables_sqlite.py 文件源码 项目:monasca-analytics 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def validate_config(_config):
        iptables_sql_schema = voluptuous.Schema({
            "module": voluptuous.And(basestring, vu.NoSpaceCharacter()),
            voluptuous.Optional("db_name"): voluptuous.And(
                basestring, vu.NoSpaceCharacter()),
        }, required=True)
        return iptables_sql_schema(_config)
randoms.py 文件源码 项目:monasca-analytics 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def validate_config(_config):
        source_schema = voluptuous.Schema({
            "module": voluptuous.And(basestring, vu.NoSpaceCharacter()),
            "params": {
                "host": voluptuous.And(basestring, vu.NoSpaceCharacter()),
                "port": int,
                "model": {
                    "name": voluptuous.And(basestring, vu.NoSpaceCharacter()),
                    "params": {
                        "origin_types": voluptuous.And([
                            {
                                "origin_type": voluptuous.And(
                                    basestring, vu.NoSpaceCharacter()),
                                "weight": voluptuous.And(
                                    voluptuous.Or(int, float),
                                    voluptuous.Range(
                                        min=0, min_included=False)),
                            }
                        ], vu.NotEmptyArray()),
                        voluptuous.Optional("key_causes"): dict
                    }
                },
                "alerts_per_burst": voluptuous.And(
                    int, voluptuous.Range(min=1)),
                "idle_time_between_bursts": voluptuous.And(
                    voluptuous.Or(int, float),
                    voluptuous.Range(min=0, min_included=False))
            }
        }, required=True)
        return source_schema(_config)
test_base_sqlite.py 文件源码 项目:monasca-analytics 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def validate_config(_config):
        base_schema = voluptuous.Schema({
            "module": voluptuous.And(
                basestring, lambda i: not any(c.isspace() for c in i)),
            voluptuous.Optional("db_name"): voluptuous.And(
                basestring, lambda i: not any(c.isspace() for c in i)),
        }, required=True)
        return base_schema(_config)
resource_type.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def schema(self):
        if self.required:
            return {self.name: self.schema_ext}
        else:
            return {voluptuous.Optional(self.name): self.schema_ext}
api.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def ResourceSchema(schema):
    base_schema = {
        voluptuous.Optional('started_at'): utils.to_datetime,
        voluptuous.Optional('ended_at'): utils.to_datetime,
        voluptuous.Optional('user_id'): voluptuous.Any(None, six.text_type),
        voluptuous.Optional('project_id'): voluptuous.Any(None, six.text_type),
        voluptuous.Optional('metrics'): MetricsSchema,
    }
    base_schema.update(schema)
    return base_schema
create_validators.py 文件源码 项目:pilight 作者: DavidLP 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse_option(option_string):
    if 'OPTION_NO_VALUE' in option_string:
        option = re.findall(r'\"(.*?)\"', option_string)[0]
        # The options without values seem to still need a value
        # when used with pilight-daemon, but this are not mandatory
        # options
        # E.G.: option 'on' is 'on': 1
        return {vol.Optional(option): vol.Coerce(int)}
    elif 'OPTION_HAS_VALUE' in option_string:
        options = re.findall(r'\"(.*?)\"', option_string)
        option = options[0]
        regex = None
        if len(options) > 1:  # Option has specified value by regex
            regex = options[1]
        if 'JSON_NUMBER' in option_string:
            return {vol.Required(option): vol.Coerce(int)}
        elif 'JSON_STRING' in option_string:
            return {vol.Required(option): vol.Coerce(str)}
        else:
            raise
    elif 'OPTION_OPT_VALUE' in option_string:
        options = re.findall(r'\"(.*?)\"', option_string)
        option = options[0]
        regex = None
        if len(options) > 1:  # Option has specified value by regex
            regex = options[1]
        if 'JSON_NUMBER' in option_string:
            return {vol.Required(option): vol.Coerce(int)}
        elif 'JSON_STRING' in option_string:
            return {vol.Required(option): vol.Coerce(str)}
        else:
            raise
    else:
        print(option_string)
        raise

    raise
__init__.py 文件源码 项目:hass_config 作者: azogue 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_dbt_rh_points(self):
        """Extract temperature - humidity points from sensors."""
        def _mean(values: Union[List[float],
                                Tuple[float]]) -> Optional[float]:
            if values:
                try:
                    return sum(values) / len(values)
                except TypeError:
                    _LOGGER.error('Bad values in mean: %s', values)
            return None

        results = yield from self.collect_states()
        points = {}
        for key, value in results.items():
            if isinstance(value, dict):
                temp_zone = humid_zone = counter = 0
                for k_room, s_values in value.items():
                    temp = _mean([v[0] for v in s_values])
                    humid = _mean([v[1] for v in s_values])
                    if temp is not None and humid is not None:
                        points[k_room] = (int(100 * temp) / 100.,
                                          int(100 * humid) / 100.)
                        temp_zone += temp
                        humid_zone += humid
                        counter += 1
                if counter:
                    points[key] = (int(100 * temp_zone / counter) / 100.,
                                   int(100 * humid_zone / counter) / 100.)
            else:
                temp = _mean([v[0] for v in value])
                humid = _mean([v[1] for v in value])
                if temp is not None and humid is not None:
                    points[key] = (int(100 * temp) / 100.,
                                   int(100 * humid) / 100.)
        return points
__init__.py 文件源码 项目:homeassistant 作者: NAStools 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def log_error(e: Exception, retry_wait: Optional[float]=0,
              rollback: Optional[bool]=True,
              message: Optional[str]="Error during query: %s") -> None:
    """Log about SQLAlchemy errors in a sane manner."""
    import sqlalchemy.exc
    if not isinstance(e, sqlalchemy.exc.OperationalError):
        _LOGGER.exception(str(e))
    else:
        _LOGGER.error(message, str(e))
    if rollback:
        Session.rollback()
    if retry_wait:
        _LOGGER.info("Retrying in %s seconds", retry_wait)
        time.sleep(retry_wait)
__init__.py 文件源码 项目:homeassistant 作者: NAStools 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def async_load_config(path: str, hass: HomeAssistantType,
                      consider_home: timedelta):
    """Load devices from YAML configuration file.

    This method is a coroutine.
    """
    dev_schema = vol.Schema({
        vol.Required('name'): cv.string,
        vol.Optional('track', default=False): cv.boolean,
        vol.Optional('mac', default=None): vol.Any(None, vol.All(cv.string,
                                                                 vol.Upper)),
        vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean,
        vol.Optional('gravatar', default=None): vol.Any(None, cv.string),
        vol.Optional('picture', default=None): vol.Any(None, cv.string),
        vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All(
            cv.time_period, cv.positive_timedelta),
        vol.Optional('vendor', default=None): vol.Any(None, cv.string),
    })
    try:
        result = []
        try:
            devices = yield from hass.loop.run_in_executor(
                None, load_yaml_config_file, path)
        except HomeAssistantError as err:
            _LOGGER.error('Unable to load %s: %s', path, str(err))
            return []

        for dev_id, device in devices.items():
            try:
                device = dev_schema(device)
                device['dev_id'] = cv.slugify(dev_id)
            except vol.Invalid as exp:
                async_log_exception(exp, dev_id, devices, hass)
            else:
                result.append(Device(hass, **device))
        return result
    except (HomeAssistantError, FileNotFoundError):
        # When YAML file could not be loaded/did not contain a dict
        return []
validation.py 文件源码 项目:monasca-analytics 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _validate_schema(config):
    """Validate the configuration, with spark, up to the orchestration level

    Checks that hte spark configuration is valid, as well as the modules
    structure in the configuration up to the orchestration level.
    Each module will be responsible to validate its own sub-configuration.

    :type config: dict
    :param config: configuration model for the whole system
    :raises: SchemaError -- if the configuration, up to the
             orchestration level, is not valid
    """
    config_schema = voluptuous.Schema({
        "spark_config": {
            "appName": basestring,
            "streaming": {
                "batch_interval": voluptuous.And(int, voluptuous.Range(min=1))
            }
        },
        "server": {
            "port": int,
            "debug": bool
        },
        "sources": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "ingestors": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "smls": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "voters": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "sinks": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "ldps": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "connections": {
            voluptuous.Optional(basestring): [basestring]
        },
        "feedback": {
            voluptuous.Optional(basestring): [basestring]
        }
    }, required=True)
    return config_schema(config)
api.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def MetricSchema(definition):
        creator = pecan.request.auth_helper.get_current_user(
            pecan.request)

        # First basic validation
        schema = voluptuous.Schema({
            "archive_policy_name": six.text_type,
            "resource_id": functools.partial(ResourceID, creator=creator),
            "name": six.text_type,
            voluptuous.Optional("unit"):
            voluptuous.All(six.text_type, voluptuous.Length(max=31)),
        })
        definition = schema(definition)
        archive_policy_name = definition.get('archive_policy_name')

        name = definition.get('name')
        if name and '/' in name:
            abort(400, "'/' is not supported in metric name")
        if archive_policy_name is None:
            try:
                ap = pecan.request.indexer.get_archive_policy_for_metric(name)
            except indexer.NoArchivePolicyRuleMatch:
                # NOTE(jd) Since this is a schema-like function, we
                # should/could raise ValueError, but if we do so, voluptuous
                # just returns a "invalid value" with no useful message – so we
                # prefer to use abort() to make sure the user has the right
                # error message
                abort(400, "No archive policy name specified "
                      "and no archive policy rule found matching "
                      "the metric name %s" % name)
            else:
                definition['archive_policy_name'] = ap.name

        resource_id = definition.get('resource_id')
        if resource_id is None:
            original_resource_id = None
        else:
            if name is None:
                abort(400,
                      {"cause": "Attribute value error",
                       "detail": "name",
                       "reason": "Name cannot be null "
                       "if resource_id is not null"})
            original_resource_id, resource_id = resource_id

        enforce("create metric", {
            "creator": creator,
            "archive_policy_name": archive_policy_name,
            "resource_id": resource_id,
            "original_resource_id": original_resource_id,
            "name": name,
            "unit": definition.get('unit'),
        })

        return definition


问题


面经


文章

微信
公众号

扫码关注公众号