def populate(self, values):
for key, value in values.items():
if key not in self._values: continue # Automatically delete removed settings
default = self._values[key]
if isinstance(default, Config) and isinstance(value, Mapping):
default.populate(value)
else:
self._values[key] = value
python类Mapping()的实例源码
def _dump(obj, indent):
if isinstance(obj, Mapping):
new_indent = indent + ' '
items = []
for k, v in sorted(obj.items()):
items.append(
'\n' + new_indent + "'" + k + "': " + _dump(v, new_indent))
return '{' + ','.join(items) + '}'
elif isinstance(obj, bool):
return 'true' if obj else 'false'
elif isinstance(obj, list):
new_indent = indent + ' '
b = ','.join('\n' + new_indent + _dump(v, new_indent) for v in obj)
return '[' + b + ']'
elif isinstance(obj, (int, float, Decimal)):
return str(obj)
elif obj is None:
return 'null'
elif isinstance(obj, str):
quote = LONG_QUOTE if '\n' in obj or '\r' in obj else SHORT_QUOTE
return quote + obj + quote
elif isinstance(obj, bytearray):
return '{{ ' + b64encode(obj).decode() + ' }}'
elif isinstance(obj, bytes):
return "{{ '" + obj.decode() + "' }}"
elif isinstance(obj, Datetime):
if obj.tzinfo is None or obj.tzinfo == Timezone.utc:
fmt = UTC_FORMAT
else:
fmt = TZ_FORMAT
return obj.strftime(fmt)
else:
raise PionException("Type " + str(type(obj)) + " not recognised.")
def iter_pairs(iterable):
"""
Iterate over the (key, value) pairs in ``iterable``.
This handles dictionaries sensibly, and falls back to assuming the
iterable yields (key, value) pairs. This behaviour is similar to
what Python's ``dict()`` constructor does.
"""
if isinstance(iterable, Mapping):
iterable = iterable.items()
return iter(iterable)
def __new__(cls, arg):
if isinstance(arg, abc.Mapping):
return super().__new__(cls)
elif isinstance(arg, abc.MutableSequence):
return [cls(item) for item in arg]
else:
return arg
def make_mapping(v, key_fn=identity):
"""Return a mapping from an object, using a function to generate keys if needed.
Mappings are left as is, iterables are split into elements, everything else is
wrapped in a singleton map."""
if v is None: return {}
elif isinstance(v, Mapping): return v
elif non_string_iterable(v): return { ignoring_extra_args(key_fn)(i, x) : x for (i,x) in enumerate(v) }
else: return { ignoring_extra_args(key_fn)(None, v) : v }
def __init__(self, d={}, normalize=str.lower, base_factory=dict):
self.normalize = normalize
self._d = base_factory()
self._k = {}
if isinstance(d, abc.Mapping):
for k, v in d.items():
self.__setitem__(k, v)
elif isinstance(d, abc.Iterable):
for (k, v) in d:
self.__setitem__(k, v)
def __repr__(self):
if isinstance(self._args, abc.Sequence):
argstr = ', '.join(repr(arg) for arg in self._args)
elif isinstance(self._args, abc.Mapping):
argstr = ', '.join('%s=%r' % (k,v)
for k,v in self._args.items())
provides = '/'.join(interface for interface in self.provides)
string = '<Command [{}] {}({})'.format(provides, self.name, argstr)
if self.finished:
string += ' success={}'.format(self.success)
else:
string += ' running'
return string + '>'
def getone(self, key, default=_marker):
"""Get first value matching the key."""
identity = self._title(key)
for i, k, v in self._impl._items:
if i == identity:
return v
if default is not _marker:
return default
raise KeyError('Key not found: %r' % key)
# Mapping interface #
def clear(self):
"""Remove all items from MultiDict."""
self._impl._items.clear()
self._impl.incr_version()
# Mapping interface #
def test_abc_inheritance():
assert issubclass(MultiMapping, Mapping)
assert not issubclass(MultiMapping, MutableMapping)
assert issubclass(MutableMultiMapping, Mapping)
assert issubclass(MutableMultiMapping, MutableMapping)
def test_Mapping(self):
for sample in [dict]:
self.assertIsInstance(sample(), Mapping)
self.assertTrue(issubclass(sample, Mapping))
self.validate_abstract_methods(Mapping, '__contains__', '__iter__', '__len__',
'__getitem__')
class MyMapping(Mapping):
def __len__(self):
return 0
def __getitem__(self, i):
raise IndexError
def __iter__(self):
return iter(())
self.validate_comparison(MyMapping())
def atacseq_iface_without_resources():
"""
Provide the ATAC-Seq pipeline interface as a fixture, without resources.
Note that this represents the configuration data for the interface for a
single pipeline. In order to use this in the form that a PipelineInterface
expects, this needs to be the value to which a key is mapped within a
larger Mapping.
:return Mapping: all of the pipeline interface configuration data for
ATAC-Seq, minus the resources section
"""
return {
"name": "ATACseq",
"looper_args": True,
"required_input_files": ["read1", "read2"],
"all_input_files": ["read1", "read2"],
"ngs_input_files": ["read1", "read2"],
"arguments": {
"--sample-name": "sample_name",
"--genome": "genome",
"--input": "read1",
"--input2": "read2",
"--single-or-paired": "read_type"
},
"optional_arguments": {
"--frip-ref-peaks": "FRIP_ref",
"--prealignments": "prealignments",
"--genome-size": "macs_genome_size"
}
}
def atacseq_iface_with_resources(
atacseq_iface_without_resources, resources):
"""
:param dict atacseq_iface_without_resources: PipelineInterface config
data, minus a resources section
:param Mapping resources: resources section of PipelineInterface
configuration data
:return Mapping: pipeline interface data for ATAC-Seq pipeline, with all
of the base sections plus resources section
"""
iface_data = copy.deepcopy(atacseq_iface_without_resources)
iface_data["resources"] = copy.deepcopy(resources)
return iface_data
def piface_config_bundles(request, resources):
"""
Provide the ATAC-Seq pipeline interface as a fixture, including resources.
Note that this represents the configuration data for the interface for a
single pipeline. In order to use this in the form that a PipelineInterface
expects, this needs to be the value to which a key is mapped within a
larger Mapping.
:param pytest._pytest.fixtures.SubRequest request: hook into test case
requesting this fixture, which is queried for a resources value with
which to override the default if it's present.
:param Mapping resources: pipeline interface resource specification
:return Iterable[Mapping]: collection of bundles of pipeline interface
configuration bundles
"""
iface_config_datas = request.getfixturevalue("config_bundles")
if isinstance(iface_config_datas, Mapping):
data_bundles = iface_config_datas.values()
elif isinstance(iface_config_datas, Iterable):
data_bundles = iface_config_datas
else:
raise TypeError("Expected mapping or list collection of "
"PipelineInterface data: {} ({})".format(
iface_config_datas, type(iface_config_datas)))
resource_specification = request.getfixturevalue("resources") \
if "resources" in request.fixturenames else resources
for config_bundle in data_bundles:
config_bundle.update(resource_specification)
return iface_config_datas
def extend(self, iterable):
""" Concatenate two lists by adding a list of extra items to the end
of this list. Each item added must be capable of being unpacked into a
key-value pair.
>>> kvl = KeyValueList([('one', 'eins'), ('two', 'zwei')])
>>> kvl.extend([('three', 'drei'), ('four', 'vier')])
>>> for item in kvl:
... print(item)
('one', 'eins')
('two', 'zwei')
('three', 'drei')
('four', 'vier')
>>> kvl.extend(['five', 'six'])
Traceback (most recent call last):
File "<stdin>", line 1, in ?
ValueError: KeyValueList items must be pairs
"""
if isinstance(iterable, Mapping):
list.extend(self, iterable.items())
else:
try:
list.extend(self, ((k, v) for k, v in iterable))
except ValueError:
raise ValueError("KeyValueList items must be pairs")
def extend(self, iterable):
""" Concatenate two lists by adding a list of extra items to the end
of this list. Each item added must be capable of being unpacked into a
key-value pair.
>>> kvl = KeyValueList([('one', 'eins'), ('two', 'zwei')])
>>> kvl.extend([('three', 'drei'), ('four', 'vier')])
>>> for item in kvl:
... print(item)
('one', 'eins')
('two', 'zwei')
('three', 'drei')
('four', 'vier')
>>> kvl.extend(['five', 'six'])
Traceback (most recent call last):
File "<stdin>", line 1, in ?
ValueError: KeyValueList items must be pairs
"""
if isinstance(iterable, Mapping):
list.extend(self, iterable.items())
else:
try:
list.extend(self, ((k, v) for k, v in iterable))
except ValueError:
raise ValueError("KeyValueList items must be pairs")
def format_tags(obj, defaults=None):
result = set()
for src in (obj, defaults):
if isinstance(src, Mapping):
result.update(['%s:%s' % (k, v) for k, v in src.items()])
elif isinstance(src, list):
result.update(src)
elif isinstance(src, str):
result.add(src)
return sorted(result)
def format_tags(obj, defaults=None):
result = set()
for src in (obj, defaults):
if isinstance(src, Mapping):
result.update(['%s:%s' % (k, v) for k, v in src.items()])
elif isinstance(src, list):
result.update(src)
elif isinstance(src, str):
result.add(src)
return sorted(result)
def test_Mapping(self):
for sample in [dict]:
self.assertIsInstance(sample(), Mapping)
self.assertTrue(issubclass(sample, Mapping))
self.validate_abstract_methods(Mapping, '__contains__', '__iter__', '__len__',
'__getitem__')
class MyMapping(Mapping):
def __len__(self):
return 0
def __getitem__(self, i):
raise IndexError
def __iter__(self):
return iter(())
self.validate_comparison(MyMapping())
def _merge_envs(self, merge_envs, re_env):
new_env = {}
for e in merge_envs:
if e == 'replay':
new_env.update(re_env)
elif e == 'native':
new_env.update(builtins.__xonsh_env__)
elif isinstance(e, Mapping):
new_env.update(e)
else:
raise TypeError('Type of env not understood: {0!r}'.format(e))
new_env = Env(**new_env)
return new_env