def _to_config(config_cls, default_get, environ, prefix):
vals = {}
for a in attr.fields(config_cls):
try:
ce = a.metadata[CNF_KEY]
except KeyError:
continue
if ce.sub_cls is None:
get = ce.callback or default_get
val = get(environ, a.metadata, prefix, a.name)
else:
val = _to_config(
ce.sub_cls, default_get, environ,
prefix + ((a.name if prefix else a.name),)
)
vals[a.name] = val
return config_cls(**vals)
python类fields()的实例源码
def _parseRawDoc(self, operation):
"""
Set doc fields of this operation by using the Documentation object
- If the Documentation object has a .yamlData property, we update values
of the operation properties from them. Unrecognized properties will
be added with the 'x-' prefix.
- Documentation.full is the description
- Documentation.first is the summary
"""
operation.summary = self.doco.first
operation.description = self.doco.full
if self.doco.yamlData:
fieldNames = [f.name for f in attr.fields(operation.__class__)]
for k in self.doco.yamlData:
if k in fieldNames:
setattr(operation, k, self.doco.yamlData[k])
else:
operation._extended['x-' + k] = self.doco.yamlData[k]
def test_structure_simple_from_dict_default(converter, cl_and_vals, data):
"""Test structuring non-nested attrs classes with default value."""
cl, vals = cl_and_vals
obj = cl(*vals)
attrs_with_defaults = [a for a in fields(cl)
if a.default is not NOTHING]
to_remove = data.draw(lists(elements=sampled_from(attrs_with_defaults),
unique=True))
for a in to_remove:
if isinstance(a.default, Factory):
setattr(obj, a.name, a.default.factory())
else:
setattr(obj, a.name, a.default)
dumped = asdict(obj)
for a in to_remove:
del dumped[a.name]
assert obj == converter.structure(dumped, cl)
def test_structure_union(converter, cl_and_vals_a, cl_and_vals_b):
"""Structuring of automatically-disambiguable unions works."""
# type: (Converter, Any, Any) -> None
cl_a, vals_a = cl_and_vals_a
cl_b, vals_b = cl_and_vals_b
a_field_names = {a.name for a in fields(cl_a)}
b_field_names = {a.name for a in fields(cl_b)}
assume(a_field_names)
assume(b_field_names)
common_names = a_field_names & b_field_names
if len(a_field_names) > len(common_names):
obj = cl_a(*vals_a)
dumped = asdict(obj)
res = converter.structure(dumped, Union[cl_a, cl_b])
assert isinstance(res, cl_a)
assert obj == res
def test_fallback(cl_and_vals):
"""The fallback case works."""
cl, vals = cl_and_vals
assume(attr.fields(cl)) # At least one field.
@attr.s
class A(object):
pass
fn = create_uniq_field_dis_func(A, cl)
assert fn({}) is A
assert fn(attr.asdict(cl(*vals))) is cl
attr_names = {a.name for a in attr.fields(cl)}
if 'xyz' not in attr_names:
fn({'xyz': 1}) is A # Uses the fallback.
def test_disambiguation(cl_and_vals_a, cl_and_vals_b):
"""Disambiguation should work when there are unique fields."""
cl_a, vals_a = cl_and_vals_a
cl_b, vals_b = cl_and_vals_b
req_a = {a.name for a in attr.fields(cl_a)}
req_b = {a.name for a in attr.fields(cl_b)}
assume(len(req_a))
assume(len(req_b))
assume((req_a - req_b) or (req_b - req_a))
fn = create_uniq_field_dis_func(cl_a, cl_b)
assert fn(attr.asdict(cl_a(*vals_a))) is cl_a
def test_optional_field_roundtrip(converter, cl_and_vals):
"""
Classes with optional fields can be unstructured and structured.
"""
cl, vals = cl_and_vals
@attr.s
class C(object):
a = attr.ib(type=Optional[cl])
inst = C(a=cl(*vals))
assert inst == converter.structure(converter.unstructure(inst), C)
inst = C(a=None)
unstructured = converter.unstructure(inst)
assert inst == converter.structure(unstructured, C)
def extract(cls, extractor, typ):
"""
take an attrs based class, and convert it
to jsonschema.
"""
schema = {
"title": typ.__name__,
"type": "object",
"properties": {},
"required": []
}
for attribute in attr.fields(typ):
details = cls._extract_attribute(extractor, attribute)
if details.is_required:
schema["required"].append(details.name)
schema["properties"][details.name] = details.schema
return schema
def get_spec_from_release_file(content):
"""Provide specification object describing the component of the distribution
"""
# RegExp to pull a single line "tag: value" pair from a deb822 file
re_deb822_single_line_tag = re.compile("""
^(?P<tag>[a-zA-Z][^:]*):[\ ]+ # Tag - begins at start of line
(?P<val>\S.*)$ # Value - after colon to the end of the line
""", flags=re.VERBOSE + re.MULTILINE)
# Split before PGP signature if present
content = content.split("-----BEGIN PGP SIGNATURE-----")[0]
# Parse the content for tags and values into a dictionary
release = {
match.group("tag"): match.group("val")
for match in re_deb822_single_line_tag.finditer(content)
}
# TODO: redo with conversions of components and architectures in into lists
# and date in machine-readable presentation
return DebianReleaseSpec(**{
a.name: release.get(a.name.title(), None)
for a in attr.fields(DebianReleaseSpec)
})
def describe(self):
info = []
for f in attr.fields(self.__class__):
value = getattr(self, f.name)
if value is not None:
info.append("{}={}".format(f.name, value))
return "Set/unset: {}".format(", ".join(info))
def test_edge_errors():
"""Edge input cases cause errors."""
@attr.s
class A(object):
pass
with pytest.raises(ValueError):
# Can't generate for only one class.
create_uniq_field_dis_func(A)
@attr.s
class B(object):
pass
with pytest.raises(ValueError):
# No fields on either class.
create_uniq_field_dis_func(A, B)
@attr.s
class C(object):
a = attr.ib()
@attr.s
class D(object):
a = attr.ib()
with pytest.raises(ValueError):
# No unique fields on either class.
create_uniq_field_dis_func(C, D)
def create_uniq_field_dis_func(*cls):
# type: (*Sequence[Type]) -> Callable
"""Given attr classes, generate a disambiguation function.
The function is based on unique fields."""
if len(cls) < 2:
raise ValueError('At least two classes required.')
cls_and_attrs = [(cl, set(at.name for at in fields(cl))) for cl in cls]
if len([attrs for _, attrs in cls_and_attrs if len(attrs) == 0]) > 1:
raise ValueError('At least two classes have no attributes.')
# TODO: Deal with a single class having no required attrs.
# For each class, attempt to generate a single unique required field.
uniq_attrs_dict = OrderedDict()
cls_and_attrs.sort(key=lambda c_a: -len(c_a[1]))
fallback = None # If none match, try this.
for i, (cl, cl_reqs) in enumerate(cls_and_attrs):
other_classes = cls_and_attrs[i+1:]
if other_classes:
other_reqs = reduce(or_, (c_a[1] for c_a in other_classes))
uniq = cl_reqs - other_reqs
if not uniq:
m = '{} has no usable unique attributes.'.format(cl)
raise ValueError(m)
uniq_attrs_dict[next(iter(uniq))] = cl
else:
fallback = cl
def dis_func(data):
# type: (Mapping) -> Union
if not isinstance(data, Mapping):
raise ValueError('Only input mappings are supported.')
for k, v in uniq_attrs_dict.items():
if k in data:
return v
return fallback
return dis_func
def __new__(cls, *args, **kwargs):
instance = super(Model, cls).__new__(cls)
for field in attr.fields(cls):
if field.metadata.get('related'):
target = field.metadata['related']['target']
setattr(
target,
cls.__name__.lower() + '_set',
RelatedManagerDescriptor(model=cls)
)
return cls.objects.add(instance)
def from_dir(cls, root: Path):
params = json.loads(root.joinpath('hps.json').read_text())
fields = {field.name for field in attr.fields(HyperParams)}
return cls(**{k: v for k, v in params.items() if k in fields})
def update(self, hps_string: str):
if hps_string:
values = dict(pair.split('=') for pair in hps_string.split(','))
for field in attr.fields(HyperParams):
v = values.pop(field.name, None)
if v is not None:
default = field.default
assert not isinstance(default, bool)
if isinstance(default, (int, float, str)):
v = type(default)(v)
elif isinstance(default, list):
v = [type(default[0])(x) for x in v.split('-')]
setattr(self, field.name, v)
if values:
raise ValueError('Unknown hyperparams: {}'.format(values))
def columns(cls):
"""
List the columns required to construct a suitable ``ps`` command.
"""
return [a.name for a in attr.fields(cls)]
def fields(self, obj):
raise NotImplementedError()
def match(self, actual):
if self.comparator(actual, self.expected):
return None
return _MappingLikeMismatch(
self.fields, self.get_field, actual, self.mismatch_string,
self.expected,
)
def fields(self, obj):
return obj.keys()
def fields(self, obj):
return list(field.name for field in attr.fields(type(obj)))
def fields(self, obj):
return obj._pclass_fields.keys()
def backend_help(resource_type=None):
"""
Helper function for displaying backend help listing for interface commands.
To use, add this to the interface argparse parameters:
backend=Parameter(
args=("-b", "--backend"),
nargs="+",
doc=backend_help()
)
"""
types = ResourceManager._discover_types() if not resource_type else [resource_type]
help_message = "One or more backend parameters in the form KEY=VALUE. Options are: "
help_args = []
for module_name in types:
class_name = ''.join([token.capitalize() for token in module_name.split('_')])
try:
module = import_module('niceman.resource.{}'.format(module_name))
except ImportError as exc:
raise ResourceError(
"Failed to import resource {}: {}. Known ones are: {}".format(
module_name,
exc_str(exc),
', '.join(ResourceManager._discover_types()))
)
cls = getattr(module, class_name)
if not issubclass(cls, Resource):
lgr.debug(
"Skipping %s.%s since not a Resource. Consider moving away",
module, class_name
)
continue
args = attr.fields(cls)
for arg in args:
if 'doc' in arg.metadata:
help_args.append('"{}" ({})'.format(arg.name, arg.metadata['doc']))
return help_message + ", ".join(help_args)