def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
python类Mapping()的实例源码
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def to_key_val_list(value):
"""Take an object and test to see if it can be represented as a
dictionary. If it can be, return a list of tuples, e.g.,
::
>>> to_key_val_list([('key', 'val')])
[('key', 'val')]
>>> to_key_val_list({'key': 'val'})
[('key', 'val')]
>>> to_key_val_list('string')
ValueError: cannot encode objects that are not 2-tuples.
:rtype: list
"""
if value is None:
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
if isinstance(value, collections.Mapping):
value = value.items()
return list(value)
# From mitsuhiko/werkzeug (used with permission).
def _update_nested_dict(original_dict, new_dict):
"""Update the dictionary and its nested dictionary fields.
Note: This was copy-pasted from:
opengrok/xref/submodules/yelp_lib/yelp_lib/containers/dicts.py?r=92297a46#40
The reason is that this revision requires yelp_lib>=11.0.0 but we
can not use this version yelp-main yet (see YELPLIB-65 for details).
It's simpler to just temporarily pull this in.
:param original_dict: Original dictionary
:param new_dict: Dictionary with data to update
"""
# Using our own stack to avoid recursion.
stack = [(original_dict, new_dict)]
while stack:
original_dict, new_dict = stack.pop()
for key, value in new_dict.items():
if isinstance(value, Mapping):
original_dict.setdefault(key, {})
stack.append((original_dict[key], value))
else:
original_dict[key] = value
def __eq__(self, other):
if not isinstance(other, Mapping) and not hasattr(other, 'keys'):
return False
if not isinstance(other, type(self)):
other = type(self)(other)
return (dict((k.lower(), v) for k, v in self.itermerged()) ==
dict((k.lower(), v) for k, v in other.itermerged()))
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def preprocess(cls, op, clauses):
param = clauses["between"]
if isinstance(param, list):
param = param
elif isinstance(param, Mapping):
var, vals = param.items()[0]
if isinstance(vals, list) and len(vals)==2:
param = [var, {"literal":vals[0]}, {"literal":vals[1]}]
else:
Log.error("`between` parameters are expected to be in {var: [prefix, suffix]} form")
else:
Log.error("`between` parameters are expected to be in {var: [prefix, suffix]} form")
return param, {
"default": clauses["default"],
"start": clauses["start"]
}
def quote_value(value):
if isinstance(value, (Mapping, list)):
return "."
elif isinstance(value, Date):
return text_type(value.unix)
elif isinstance(value, Duration):
return text_type(value.seconds)
elif isinstance(value, basestring):
return "'" + value.replace("'", "''") + "'"
elif value == None:
return "NULL"
elif value is True:
return "1"
elif value is False:
return "0"
else:
return text_type(value)
def __init__(self, fmt=None, datefmt=None, style="%"):
if style != '%':
raise ValueError(
"only '%' percent style is supported in both python 2 and 3")
if fmt is None:
fmt = DEFAULT_FORMATS
if isinstance(fmt, basestring):
default_format = fmt
custom_formats = {}
elif isinstance(fmt, collections.Mapping):
custom_formats = dict(fmt)
default_format = custom_formats.pop("*", None)
else:
raise TypeError('fmt must be a str or a dict of str: %r' % fmt)
super(LevelFormatter, self).__init__(default_format, datefmt)
self.default_format = self._fmt
self.custom_formats = {}
for level, fmt in custom_formats.items():
level = logging._checkLevel(level)
self.custom_formats[level] = fmt
def _resolve_refs(uri, spec):
"""Resolve JSON references in a given dictionary.
OpenAPI spec may contain JSON references to its nodes or external
sources, so any attempt to rely that there's some expected attribute
in the spec may fail. So we need to resolve JSON references before
we use it (i.e. replace with referenced object). For details see:
https://tools.ietf.org/html/draft-pbryan-zyp-json-ref-02
The input spec is modified in-place despite being returned from
the function.
"""
resolver = jsonschema.RefResolver(uri, spec)
def _do_resolve(node):
if isinstance(node, collections.Mapping) and '$ref' in node:
with resolver.resolving(node['$ref']) as resolved:
return resolved
elif isinstance(node, collections.Mapping):
for k, v in node.items():
node[k] = _do_resolve(v)
elif isinstance(node, (list, tuple)):
for i in range(len(node)):
node[i] = _do_resolve(node[i])
return node
return _do_resolve(spec)
def _validate_tag_sets(tag_sets):
"""Validate tag sets for a MongoReplicaSetClient.
"""
if tag_sets is None:
return tag_sets
if not isinstance(tag_sets, list):
raise TypeError((
"Tag sets %r invalid, must be a list") % (tag_sets,))
if len(tag_sets) == 0:
raise ValueError((
"Tag sets %r invalid, must be None or contain at least one set of"
" tags") % (tag_sets,))
for tags in tag_sets:
if not isinstance(tags, Mapping):
raise TypeError(
"Tag set %r invalid, must be an instance of dict, "
"bson.son.SON or other type that inherits from "
"collection.Mapping" % (tags,))
return tag_sets
def _index_document(index_list):
"""Helper to generate an index specifying document.
Takes a list of (key, direction) pairs.
"""
if isinstance(index_list, collections.Mapping):
raise TypeError("passing a dict to sort/create_index/hint is not "
"allowed - use a list of tuples instead. did you "
"mean %r?" % list(iteritems(index_list)))
elif not isinstance(index_list, (list, tuple)):
raise TypeError("must use a list of (key, direction) pairs, "
"not: " + repr(index_list))
if not len(index_list):
raise ValueError("key_or_list must not be the empty list")
index = SON()
for (key, value) in index_list:
if not isinstance(key, string_type):
raise TypeError("first item in each key pair must be a string")
if not isinstance(value, (string_type, int, collections.Mapping)):
raise TypeError("second item in each key pair must be 1, -1, "
"'2d', 'geoHaystack', or another valid MongoDB "
"index specifier.")
index[key] = value
return index
def _fields_list_to_dict(fields, option_name):
"""Takes a sequence of field names and returns a matching dictionary.
["a", "b"] becomes {"a": 1, "b": 1}
and
["a.b.c", "d", "a.c"] becomes {"a.b.c": 1, "d": 1, "a.c": 1}
"""
if isinstance(fields, collections.Mapping):
return fields
if isinstance(fields, collections.Sequence):
if not all(isinstance(field, string_type) for field in fields):
raise TypeError("%s must be a list of key names, each an "
"instance of %s" % (option_name,
string_type.__name__))
return dict.fromkeys(fields, 1)
raise TypeError("%s must be a mapping or "
"list of key names" % (option_name,))
def remove(self, spec_or_id=None, multi=True, **kwargs):
"""Remove a document(s) from this collection.
**DEPRECATED** - Use :meth:`delete_one` or :meth:`delete_many` instead.
.. versionchanged:: 3.0
Removed the `safe` parameter. Pass ``w=0`` for unacknowledged write
operations.
"""
warnings.warn("remove is deprecated. Use delete_one or delete_many "
"instead.", DeprecationWarning, stacklevel=2)
if spec_or_id is None:
spec_or_id = {}
if not isinstance(spec_or_id, collections.Mapping):
spec_or_id = {"_id": spec_or_id}
write_concern = None
if kwargs:
write_concern = WriteConcern(**kwargs)
with self._socket_for_writes() as sock_info:
return self._delete(sock_info, spec_or_id, multi, write_concern)
def __new__(cls, code, scope=None, **kwargs):
if not isinstance(code, string_type):
raise TypeError("code must be an "
"instance of %s" % (string_type.__name__))
self = str.__new__(cls, code)
try:
self.__scope = code.scope
except AttributeError:
self.__scope = {}
if scope is not None:
if not isinstance(scope, collections.Mapping):
raise TypeError("scope must be an instance of dict")
self.__scope.update(scope)
self.__scope.update(kwargs)
return self
def to_dict(self):
"""Convert a SON document to a normal Python dictionary instance.
This is trickier than just *dict(...)* because it needs to be
recursive.
"""
def transform_value(value):
if isinstance(value, list):
return [transform_value(v) for v in value]
elif isinstance(value, collections.Mapping):
return dict([
(k, transform_value(v))
for k, v in iteritems(value)])
else:
return value
return transform_value(dict(self))
def _make_option_strings(cls, options_map):
ret = []
options_copy = dict(options_map.items())
for option in cls.option_maps:
value = options_copy.get(option)
if isinstance(value, Mapping):
del options_copy[option]
params = ("'%s': '%s'" % (k, v) for k, v in value.items())
ret.append("%s = {%s}" % (option, ', '.join(params)))
for name, value in options_copy.items():
if value is not None:
if name == "comment":
value = value or ""
ret.append("%s = %s" % (name, protect_value(value)))
return list(sorted(ret))
def _walk(node, config, path, parameters):
if is_ref(node) and node[node.keys()[0]] in parameters:
node = parameters[node[node.keys()[0]]]
if isinstance(node, list):
return [apply_extrinsics(_walk(value, config, path + [i], parameters), config, path + [i], parameters)
for i, value in enumerate(node)]
if isinstance(node, collections.Mapping):
return {key: apply_extrinsics(_walk(value, config, path + [key], parameters), config, path + [key], parameters)
for key, value in node.iteritems()}
if isinstance(node, int):
return node
if isinstance(node, unicode):
return node
raise UnexpectedNodeType(path, node)
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def objwalk(obj, path=(), memo=None):
if memo is None:
memo = set()
iterator = None
if isinstance(obj, Mapping):
iterator = iteritems
elif isinstance(obj, (Sequence, Set)) and not isinstance(obj, string_types):
iterator = enumerate
if iterator:
if id(obj) not in memo:
memo.add(id(obj))
for path_component, value in iterator(obj):
for result in objwalk(value, path + (path_component,), memo):
yield result
memo.remove(id(obj))
else:
yield path, obj
# optional test code from here on
def _update(d, u):
"""Update a nested dict recursively in place.
>>> _update({'a': 0, 'b': 2}, {'a': 1})
{'a': 1, 'b': 2}
>>> _update({'a': {'b': 1}}, {'a': 0})
{'a': 0}
>>> d = _update({'a': {'b': 1}}, {'a': {'c': 2}})
>>> d == {'a': {'b': 1, 'c': 2}}
True
"""
for k, v in six.iteritems(u):
r = d.get(k, {})
if isinstance(v, Mapping) and isinstance(r, Mapping):
_update(r, v)
d[k] = r
else:
d[k] = u[k]
return d
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({0} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def get_raw_data(self, datum):
"""Returns the raw data for this column, before any filters or
formatting are applied to it. This is useful when doing calculations
on data in the table.
"""
# Callable transformations
if callable(self.transform):
data = self.transform(datum)
# Dict lookups
elif isinstance(datum, collections.Mapping) and \
self.transform in datum:
data = datum.get(self.transform)
else:
# Basic object lookups
data = getattr(datum, self.transform, None)
if data is None:
msg = _("The attribute %(attr)s doesn't exist on "
"%(obj)s.") % {'attr': self.transform, 'obj': datum}
msg = termcolors.colorize(msg, **PALETTE['ERROR'])
LOG.debug(msg)
return data