def with_query(self, *args, **kwargs):
"""Return a new URL with query part replaced.
Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
or str, autoencode the argument if needed.
It also can take an arbitrary number of keyword arguments.
Clear query if None is passed.
"""
# N.B. doesn't cleanup query/fragment
new_query = self._get_str_query(*args, **kwargs)
return URL(
self._val._replace(path=self._val.path, query=new_query),
encoded=True)
python类Mapping()的实例源码
def diffs(*mappings, missing=MISSING):
"""Yield keys and values which differ between the two mappings.
A 'mapping' is any object which implements keys() and __getitem__().
"""
assert mappings
assert all(isinstance(mapping, Mapping) for mapping in mappings)
# Defer to __eq__(), even if it contradicts the algorithm below
if all_eq(mappings):
return
keys = chain.from_iterable(mapping.keys() for mapping in mappings)
for key in unique(keys):
vals = tuple(values(mappings, key))
if not all_eq(vals):
yield key, vals
def __getitem__(self, key: str) -> Any:
node = self.mapping
leafs = key.split(".")
for i, leaf in enumerate(leafs):
if not isinstance(node, c_abc.Mapping):
raise KeyError(f"Element {'.'.join(leafs[:i])!r} is not a mapping")
if not leaf:
raise KeyError(f"Empty sub-key after {'.'.join(leafs[:i])!r}")
if leaf not in node:
break
node = node[leaf]
else:
return node
raise KeyError(f"Cannot find '{key}'")
def __eq__(self, other):
if not isinstance(other, abc.Mapping):
return NotImplemented
if isinstance(other, _Base):
lft = self._impl._items
rht = other._impl._items
if len(lft) != len(rht):
return False
for (i1, k2, v1), (i2, k2, v2) in zip(lft, rht):
if i1 != i2 or v1 != v2:
return False
return True
if len(self._impl._items) != len(other):
return False
for k, v in self.items():
nv = other.get(k, _marker)
if v != nv:
return False
return True
def basic_instance_data(request, instance_raw_data):
"""
Transform the raw data for a basic model instance to comply with its ctor.
:param pytest._pytest.fixtures.SubRequest request: test case requesting
the basic instance data
:param Mapping instance_raw_data: the raw data needed to create a
model instance
:return object: basic instance data in a form accepted by its constructor
"""
# Cleanup is free with _write_config, using request's temp folder.
transformation_by_class = {
"AttributeDict": lambda data: data,
"PipelineInterface": lambda data:
_write_config(data, request, "pipeline_interface.yaml"),
"ProtocolInterface": lambda data:
_write_config(data, request, "pipeline_interface.yaml"),
"ProtocolMapper": lambda data: data,
"Sample": lambda data: pd.Series(data)}
which_class = request.getfixturevalue("class_name")
return transformation_by_class[which_class](instance_raw_data)
def _write_config(data, request, filename):
"""
Write configuration data to file.
:param str Sequence | Mapping data: data to write to file, YAML compliant
:param pytest._pytest.fixtures.SubRequest request: test case that
requested a fixture from which this function was called
:param str filename: name for the file to write
:return str: full path to the file written
"""
# We get cleanup for free by writing to file in requests temp folder.
dirpath = request.getfixturevalue("tmpdir").strpath
filepath = os.path.join(dirpath, filename)
with open(filepath, 'w') as conf_file:
yaml.safe_dump(data, conf_file)
return filepath
def get_part(self, doc, part):
""" Returns the next step in the correct type """
if isinstance(doc, Mapping):
return part
elif isinstance(doc, Sequence):
if part == '-':
return part
if not RE_ARRAY_INDEX.match(str(part)):
raise JsonPointerException("'%s' is not a valid list index" % (part, ))
return int(part)
elif hasattr(doc, '__getitem__'):
# Allow indexing via ducktyping if the target has defined __getitem__
return part
else:
raise JsonPointerException("Document '%s' does not support indexing, "
"must be dict/list or support __getitem__" % type(doc))
def test_against_direct_model(data):
keys = list(data.keys())
if not isinstance(data[keys[0]], Mapping):
return
if 'weights' in data[keys[0]]:
return
y = []
x = []
data_copy = OrderedDict()
for i in range(min(3, len(data))):
data_copy[keys[i]] = data[keys[i]]
y.append(data[keys[i]]['dependent'])
x.append(data[keys[i]]['exog'])
direct = simple_sur(y, x)
mod = SUR(data_copy)
res = mod.fit(method='ols')
assert_allclose(res.params.values[:, None], direct.beta0)
res = mod.fit(method='gls')
assert_allclose(res.params.values[:, None], direct.beta1)
def cast_json(json_dict):
"""Convert an arbitrary JSON source into MongoDB
compatible format."""
DOT = '_'
DOLLAR = '\uff04'
if isinstance(json_dict, str):
return json_dict.replace('.', DOT).replace('$', DOLLAR)
if six.PY2 and isinstance(json_dict, unicode): # noqa
return json_dict.replace('.', DOT).replace('$', DOLLAR)
if isinstance(json_dict, Mapping):
return {cast_json(key): cast_json(value) for
key, value in json_dict.items()}
elif isinstance(json_dict, Iterable):
return [cast_json(o) for o in json_dict]
else:
return json_dict
def cast_json(json_dict):
"""Convert an arbitrary JSON source into MongoDB
compatible format."""
DOT = '_'
DOLLAR = '\uff04'
if isinstance(json_dict, str):
return json_dict.replace('.', DOT).replace('$', DOLLAR)
if six.PY2 and isinstance(json_dict, unicode): # noqa
return json_dict.replace('.', DOT).replace('$', DOLLAR)
if isinstance(json_dict, Mapping):
return {cast_json(key): cast_json(value) for
key, value in json_dict.items()}
elif isinstance(json_dict, Iterable):
return [cast_json(o) for o in json_dict]
else:
return json_dict
def cast_json(json_dict):
"""Convert an arbitrary JSON source into MongoDB
compatible format."""
DOT = '_'
DOLLAR = '\uff04'
if isinstance(json_dict, str):
return json_dict.replace('.', DOT).replace('$', DOLLAR)
if six.PY2 and isinstance(json_dict, unicode): # noqa
return json_dict.replace('.', DOT).replace('$', DOLLAR)
if isinstance(json_dict, Mapping):
return {cast_json(key): cast_json(value) for
key, value in json_dict.items()}
elif isinstance(json_dict, Iterable):
return [cast_json(o) for o in json_dict]
else:
return json_dict
def cast_json(json_dict):
"""Convert an arbitrary JSON source into MongoDB
compatible format."""
DOT = '_'
DOLLAR = '\uff04'
if isinstance(json_dict, str):
return json_dict.replace('.', DOT).replace('$', DOLLAR)
if six.PY2 and isinstance(json_dict, unicode): # noqa
return json_dict.replace('.', DOT).replace('$', DOLLAR)
if isinstance(json_dict, Mapping):
return {cast_json(key): cast_json(value) for
key, value in json_dict.items()}
elif isinstance(json_dict, Iterable):
return [cast_json(o) for o in json_dict]
else:
return json_dict
def asjson(obj, seen=None):
if isinstance(obj, collections.Mapping) or isiter(obj):
# prevent traversal of recursive structures
if seen is None:
seen = set()
elif id(obj) in seen:
return '__RECURSIVE__'
seen.add(id(obj))
if hasattr(obj, '__json__') and type(obj) is not type:
return obj.__json__()
elif isinstance(obj, collections.Mapping):
result = collections.OrderedDict()
for k, v in obj.items():
try:
result[asjson(k, seen)] = asjson(v, seen)
except TypeError:
debug('Unhashable key?', type(k), str(k))
raise
return result
elif isiter(obj):
return [asjson(e, seen) for e in obj]
else:
return obj
def get_part(self, doc, part):
"""Returns the next step in the correct type"""
if isinstance(doc, Mapping):
return part
elif isinstance(doc, Sequence):
if part == '-':
return part
if not self._RE_ARRAY_INDEX.match(str(part)):
raise JsonPointerException("'%s' is not a valid sequence index" % part)
return int(part)
elif hasattr(doc, '__getitem__'):
# Allow indexing via ducktyping
# if the target has defined __getitem__
return part
else:
raise JsonPointerException("Document '%s' does not support indexing, "
"must be mapping/sequence or support __getitem__" % type(doc))
def run_test(work_type: FunctionType, job_sets: Sequence, trials: int,
pool_class: type, worker_count: int) -> Mapping:
pool = pool_class(worker_count)
if work_type == 'compute':
test_func = pool.run_compute_test
elif work_type == 'network':
test_func = pool.run_network_test
else:
raise Exception("Invalid work type: {}".format(work_type))
results = map(
lambda jobs: test_func(jobs, trials, show_progress=True),
tqdm(job_sets, desc=pool_class.__name__),
)
summarized_results = list(map(summarize_test, results))
pool.destroy_pool()
return summarized_results
def frozen(struct):
"""Return an immutable, hashable version of the given data structure.
Iterators (including generators) are hashable but mutable, so they
are evaluated and returned as tuples---if they are infinite, this
function will not exit.
"""
if isinstance(struct, Mapping):
return frozenset((k, frozen(v)) for k, v in struct.items())
if isinstance(struct, Set):
return frozenset(frozen(item) for item in struct)
if isinstance(struct, Iterable): # Includes iterators and generators
return tuple(frozen(item) for item in struct)
hash(struct) # Raise TypeError for unhashable objects
return struct
def hashified(struct, use_none=False):
"""Return a hashable version of the given data structure.
If use_none is True, returns None instead of raising TypeError for
unhashable types: this will serve as a bad but sometimes passable
hash.
See also functools._make_key, which might be a better choice.
"""
try:
hash(struct)
except TypeError:
pass
else:
# Return the original object if it's already hashable
return struct
if isinstance(struct, Mapping):
return frozenset((k, hashified(v)) for k, v in struct.items())
if isinstance(struct, Set):
return frozenset(hashified(item) for item in struct)
if isinstance(struct, Iterable):
return tuple(hashified(item) for item in struct)
if use_none:
return None
raise TypeError('unhashable type: {.__name__!r}'.format(type(struct)))
def __call__(self, obj):
"""Transforms the JSON object `obj`."""
if isinstance(obj, str):
return obj
elif isinstance(obj, Sequence):
return self.act_on_list(obj)
elif isinstance(obj, Mapping):
return self.act_on_dict(obj)
else:
return obj
def build(cls, obj):
if isinstance(obj, abc.Mapping):
return cls(obj)
elif isinstance(obj, abc.MutableSequence):
return [cls.build(item) for item in obj]
else: # <8>
return obj
def build(cls, obj): # <5>
if isinstance(obj, abc.Mapping): # <6>
return cls(obj)
elif isinstance(obj, abc.MutableSequence): # <7>
return [cls.build(item) for item in obj]
else: # <8>
return obj
# END EXPLORE0
def __new__(cls, arg): # <1>
if isinstance(arg, abc.Mapping):
return super().__new__(cls) # <2>
elif isinstance(arg, abc.MutableSequence): # <3>
return [cls(item) for item in arg]
else:
return arg
def _is_list(obj):
return (isinstance(obj, Sized) and isinstance(obj, Iterable) and
not isinstance(obj, (Set, Mapping)))
def _is_dict(obj):
return isinstance(obj, Mapping)
def __init__(self, mapping: Mapping = None) -> None:
if mapping is None:
mapping = {}
if not isinstance(mapping, c_abc.Mapping):
raise ValueError("Must be a mapping")
self.mapping = mapping
def __init__(self, mapping: Mapping, *fallback_configs: Configuration) -> None:
super().__init__(mapping)
# for fb_c in fallback_configs:
# if not isinstance(fb_c, Configuration):
# raise ValueError(f"{fb_c!r} is not an instance of {Configuration}")
self.fallback_configs = fallback_configs
def __init__(self, name: str, mapping: Mapping, *fallback_configs: Configuration) -> None:
super().__init__(mapping, *fallback_configs)
self.name = name
self._require_keys({'nick', 'user', 'realname'})
self._fix_channels(mapping)
self.servers = self._parse_servers(mapping)
def _fix_channels(mapping: Mapping) -> None:
# TODO move to channels core plugin
for channel, channel_conf in mapping.get('channels', {}).items():
if channel_conf is None:
mapping['channels'][channel] = channel_conf = {}
# replace channel names 'foobar' with '#foobar'
if not channel.startswith(tuple('#&+!')):
del mapping['channels'][channel]
mapping['channels'][f'#{channel}'] = channel_conf
def __init__(self, mapping: Mapping) -> None:
super().__init__(mapping)
self.networks = list(self._parse_networks(mapping))
def _parse_networks(self, root: Mapping) -> List[NetworkConfiguration]:
networks = root.get('networks', None)
if networks is None:
raise ConfigurationError("No networks found")
return [NetworkConfiguration(name, mapping, self)
for name, mapping in networks.items()]
def __setitem__(self, key, value):
if isinstance(value, Mapping):
value = Config(**value)
self._values[key] = value