def _(node, dask, scope):
def retrieve(term):
try:
return scope[term]
except KeyError:
scope[term] = ret = _ltree_to_dask(term, dask, scope)
return ret
name = '%s-%s' % (node.func, uuid4())
dask[name] = (
apply,
retrieve(node.func),
list(map(retrieve, node.args)),
(dict, list(map(list, valmap(retrieve, node.kwargs).items()))),
)
scope[node] = name
return name
python类valmap()的实例源码
def _expect_bounded(make_bounded_check, __funcname, **named):
def valid_bounds(t):
return (
isinstance(t, tuple)
and len(t) == 2
and t != (None, None)
)
for name, bounds in iteritems(named):
if not valid_bounds(bounds):
raise TypeError(
"expect_bounded() expected a tuple of bounds for"
" argument '{name}', but got {bounds} instead.".format(
name=name,
bounds=bounds,
)
)
return preprocess(**valmap(make_bounded_check, named))
def expect_element(*_pos, **named):
"""
Preprocessing decorator that verifies inputs are elements of some
expected collection.
Usage
-----
>>> @expect_element(x=('a', 'b'))
... def foo(x):
... return x.upper()
...
>>> foo('a')
'A'
>>> foo('b')
'B'
>>> foo('c')
Traceback (most recent call last):
...
ValueError: foo() expected a value in ('a', 'b') for argument 'x', but got 'c' instead. # noqa
Notes
-----
This uses the `in` operator (__contains__) to make the containment check.
This allows us to use any custom container as long as the object supports
the container protocol.
"""
if _pos:
raise TypeError("expect_element() only takes keyword arguments.")
def _expect_element(collection):
template = (
"%(funcname)s() expected a value in {collection} "
"for argument '%(argname)s', but got %(actual)s instead."
).format(collection=collection)
return make_check(
ValueError,
template,
complement(op.contains(collection)),
repr,
)
return preprocess(**valmap(_expect_element, named))
def expect_dimensions(**dimensions):
"""
Preprocessing decorator that verifies inputs are numpy arrays with a
specific dimensionality.
Usage
-----
>>> from numpy import array
>>> @expect_dimensions(x=1, y=2)
... def foo(x, y):
... return x[0] + y[0, 0]
...
>>> foo(array([1, 1]), array([[1, 1], [2, 2]]))
2
>>> foo(array([1, 1], array([1, 1])))
Traceback (most recent call last):
...
TypeError: foo() expected a 2-D array for argument 'y', but got a 1-D array instead. # noqa
"""
def _expect_dimension(expected_ndim):
def _check(func, argname, argvalue):
funcname = _qualified_name(func)
actual_ndim = argvalue.ndim
if actual_ndim != expected_ndim:
if actual_ndim == 0:
actual_repr = 'scalar'
else:
actual_repr = "%d-D array" % actual_ndim
raise ValueError(
"{func}() expected a {expected:d}-D array"
" for argument {argname!r}, but got a {actual}"
" instead.".format(
func=funcname,
expected=expected_ndim,
argname=argname,
actual=actual_repr,
)
)
return argvalue
return _check
return preprocess(**valmap(_expect_dimension, dimensions))
def __init__(self, constructors, name, *args, **kwargs):
args = tuple(map(self._unwrap_name, args))
kwargs = valmap(self._unwrap_name, kwargs)
already_bound = {}
for n, arg in enumerate(args):
if arg in already_bound:
raise TypeError(
'argument %r at position %d is already bound to the'
' positional argument at index %d' % (
arg,
n,
already_bound[arg],
),
)
already_bound[arg] = n
for k, arg in kwargs.items():
if arg in already_bound:
loc = already_bound[arg]
raise TypeError(
'argument %r at keyword %s is already bound to the %s' % (
arg,
k,
('positional argument at index %d' % loc)
if isinstance(loc, int) else
('keyword argument %r' % loc),
),
)
super().__init__(constructors, name, *args, **kwargs)
del constructors[name]
self._constructors = constructors
def normalize(df, index=True):
if index:
df = df.reset_index()
for col in df.select_dtypes([bool]):
df[col] = df[col].astype('uint8')
dtypes = valmap(PD2CH.get, OrderedDict(df.dtypes))
if None in dtypes.values():
raise ValueError('Unknown type mapping in dtypes: {}'.format(dtypes))
return dtypes, df
def proxy(self):
if self.composite:
value = lazy_dict(t.valmap(lambda a: lambda: a.proxy(), self.value))
else:
value = self.value
return artifact_proxy(value, self)
def _inputs_json(inputs):
expanded = t.valmap(_transform, inputs['kargs'])
expanded['__varargs'] = list(t.map(_transform, inputs['varargs']))
return expanded
def __repr__(self):
return "lazy_dict({})".format(
t.merge(t.valmap(lambda _: "...", self.thunks), self.realized))
def lazy_proxy_dict(artifacts_or_ids, group_artifacts_of_same_name=False):
"""
Takes a list of artifacts or artifact ids and returns a dictionary whose
keys are the names of the artifacts. The values will be lazily loaded into
proxies as requested.
Parameters
----------
artifacts_or_ids : collection of artifacts or artifact ids (strings)
group_artifacts_of_same_name: bool (default: False)
If set to True then artifacts of the same name will be grouped together in
one list. When set to False an exception will be raised
"""
if isinstance(artifacts_or_ids, dict):
artifacts = t.valmap(coerce_to_artifact, artifacts_or_ids)
lambdas = {name: (lambda a: lambda: a.proxy())(a)
for name, a in artifacts.items()}
return lazy_dict(lambdas)
# else we have a collection
artifacts = coerce_to_artifacts(artifacts_or_ids)
by_name = t.groupby(lambda a: a.name, artifacts)
singles = t.valfilter(lambda l: len(l) == 1, by_name)
multi = t.valfilter(lambda l: len(l) > 1, by_name)
lambdas = {name: (lambda a: lambda: a.proxy())(a[0]) for name, a in singles.items()}
if group_artifacts_of_same_name and len(multi) > 0:
lambdas = t.merge(lambdas,
{name:
(lambda artifacts: (lambda: [a.proxy() for a in artifacts]))(artifacts)
for name, artifacts in multi.items()})
if not group_artifacts_of_same_name and len(multi) > 0:
raise ValueError("""Only artifacts with distinct names can be used in a lazy_proxy_dict.
Offending names: {}
Use the option `group_artifacts_of_same_name=True` if you want a list of proxies to be returned under the respective key.
""".format({n: len(a) for n, a in multi.items()}))
return lazy_dict(lambdas)
def merge_prototypes(config):
return t.valmap(full_config(config), config)
def expect_types(*_pos, **named):
"""
Preprocessing decorator that verifies inputs have expected types.
Usage
-----
>>> @expect_types(x=int, y=str)
... def foo(x, y):
... return x, y
...
>>> foo(2, '3')
(2, '3')
>>> foo(2.0, '3')
Traceback (most recent call last):
...
TypeError: foo() expected an argument of type 'int' for argument 'x', but got float instead. # noqa
"""
if _pos:
raise TypeError("expect_types() only takes keyword arguments.")
for name, type_ in iteritems(named):
if not isinstance(type_, (type, tuple)):
raise TypeError(
"expect_types() expected a type or tuple of types for "
"argument '{name}', but got {type_} instead.".format(
name=name, type_=type_,
)
)
def _expect_type(type_):
# Slightly different messages for type and tuple of types.
_template = (
"%(funcname)s() expected a value of type {type_or_types} "
"for argument '%(argname)s', but got %(actual)s instead."
)
if isinstance(type_, tuple):
template = _template.format(
type_or_types=' or '.join(map(_qualified_name, type_))
)
else:
template = _template.format(type_or_types=_qualified_name(type_))
return make_check(
TypeError,
template,
lambda v: not isinstance(v, type_),
compose(_qualified_name, type),
)
return preprocess(**valmap(_expect_type, named))
def test_deltas_only_one_delta_in_universe(self, asset_info):
expr = bz.data(self.df, name='expr', dshape=self.dshape)
deltas = pd.DataFrame({
'sid': [65, 66],
'asof_date': [self.dates[1], self.dates[0]],
'timestamp': [self.dates[2], self.dates[1]],
'value': [10, 11],
})
deltas = bz.data(deltas, name='deltas', dshape=self.dshape)
expected_views = keymap(pd.Timestamp, {
'2014-01-02': np.array([[0.0, 11.0, 2.0],
[1.0, 2.0, 3.0]]),
'2014-01-03': np.array([[10.0, 2.0, 3.0],
[2.0, 3.0, 4.0]]),
'2014-01-04': np.array([[2.0, 3.0, 4.0],
[2.0, 3.0, 4.0]]),
})
nassets = len(asset_info)
if nassets == 4:
expected_views = valmap(
lambda view: np.c_[view, [np.nan, np.nan]],
expected_views,
)
with tmp_asset_finder(equities=asset_info) as finder:
expected_output = pd.DataFrame(
columns=[
'value',
],
data=np.array([11, 10, 4]).repeat(len(asset_info.index)),
index=pd.MultiIndex.from_product((
sorted(expected_views.keys()),
finder.retrieve_all(asset_info.index),
)),
)
dates = self.dates
dates = dates.insert(len(dates), dates[-1] + timedelta(days=1))
self._run_pipeline(
expr,
deltas,
expected_views,
expected_output,
finder,
calendar=dates,
start=dates[1],
end=dates[-1],
window_length=2,
compute_fn=np.nanmax,
)
def expect_kinds(**named):
"""
Preprocessing decorator that verifies inputs have expected dtype kinds.
Examples
--------
>>> from numpy import int64, int32, float32
>>> @expect_kinds(x='i')
... def foo(x):
... return x
...
>>> foo(int64(2))
2
>>> foo(int32(2))
2
>>> foo(float32(2)) # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
Traceback (most recent call last):
...
TypeError: ...foo() expected a numpy object of kind 'i' for argument 'x',
but got 'f' instead.
"""
for name, kind in iteritems(named):
if not isinstance(kind, (str, tuple)):
raise TypeError(
"expect_dtype_kinds() expected a string or tuple of strings"
" for argument {name!r}, but got {kind} instead.".format(
name=name, kind=dtype,
)
)
@preprocess(kinds=call(lambda x: x if isinstance(x, tuple) else (x,)))
def _expect_kind(kinds):
"""
Factory for kind-checking functions that work the @preprocess
decorator.
"""
def error_message(func, argname, value):
# If the bad value has a dtype, but it's wrong, show the dtype
# kind. Otherwise just show the value.
try:
value_to_show = value.dtype.kind
except AttributeError:
value_to_show = value
return (
"{funcname}() expected a numpy object of kind {kinds} "
"for argument {argname!r}, but got {value!r} instead."
).format(
funcname=_qualified_name(func),
kinds=' or '.join(map(repr, kinds)),
argname=argname,
value=value_to_show,
)
def _actual_preprocessor(func, argname, argvalue):
if getattrs(argvalue, ('dtype', 'kind'), object()) not in kinds:
raise TypeError(error_message(func, argname, argvalue))
return argvalue
return _actual_preprocessor
return preprocess(**valmap(_expect_kind, named))
def expect_types(__funcname=_qualified_name, **named):
"""
Preprocessing decorator that verifies inputs have expected types.
Examples
--------
>>> @expect_types(x=int, y=str)
... def foo(x, y):
... return x, y
...
>>> foo(2, '3')
(2, '3')
>>> foo(2.0, '3') # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
Traceback (most recent call last):
...
TypeError: ...foo() expected a value of type int for argument 'x',
but got float instead.
Notes
-----
A special argument, __funcname, can be provided as a string to override the
function name shown in error messages. This is most often used on __init__
or __new__ methods to make errors refer to the class name instead of the
function name.
"""
for name, type_ in iteritems(named):
if not isinstance(type_, (type, tuple)):
raise TypeError(
"expect_types() expected a type or tuple of types for "
"argument '{name}', but got {type_} instead.".format(
name=name, type_=type_,
)
)
def _expect_type(type_):
# Slightly different messages for type and tuple of types.
_template = (
"%(funcname)s() expected a value of type {type_or_types} "
"for argument '%(argname)s', but got %(actual)s instead."
)
if isinstance(type_, tuple):
template = _template.format(
type_or_types=' or '.join(map(_qualified_name, type_))
)
else:
template = _template.format(type_or_types=_qualified_name(type_))
return make_check(
exc_type=TypeError,
template=template,
pred=lambda v: not isinstance(v, type_),
actual=compose(_qualified_name, type),
funcname=__funcname,
)
return preprocess(**valmap(_expect_type, named))
def expect_element(__funcname=_qualified_name, **named):
"""
Preprocessing decorator that verifies inputs are elements of some
expected collection.
Examples
--------
>>> @expect_element(x=('a', 'b'))
... def foo(x):
... return x.upper()
...
>>> foo('a')
'A'
>>> foo('b')
'B'
>>> foo('c') # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
Traceback (most recent call last):
...
ValueError: ...foo() expected a value in ('a', 'b') for argument 'x',
but got 'c' instead.
Notes
-----
A special argument, __funcname, can be provided as a string to override the
function name shown in error messages. This is most often used on __init__
or __new__ methods to make errors refer to the class name instead of the
function name.
This uses the `in` operator (__contains__) to make the containment check.
This allows us to use any custom container as long as the object supports
the container protocol.
"""
def _expect_element(collection):
if isinstance(collection, (set, frozenset)):
# Special case the error message for set and frozen set to make it
# less verbose.
collection_for_error_message = tuple(sorted(collection))
else:
collection_for_error_message = collection
template = (
"%(funcname)s() expected a value in {collection} "
"for argument '%(argname)s', but got %(actual)s instead."
).format(collection=collection_for_error_message)
return make_check(
ValueError,
template,
complement(op.contains(collection)),
repr,
funcname=__funcname,
)
return preprocess(**valmap(_expect_element, named))
def expect_dimensions(__funcname=_qualified_name, **dimensions):
"""
Preprocessing decorator that verifies inputs are numpy arrays with a
specific dimensionality.
Examples
--------
>>> from numpy import array
>>> @expect_dimensions(x=1, y=2)
... def foo(x, y):
... return x[0] + y[0, 0]
...
>>> foo(array([1, 1]), array([[1, 1], [2, 2]]))
2
>>> foo(array([1, 1]), array([1, 1])) # doctest: +NORMALIZE_WHITESPACE
... # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ValueError: ...foo() expected a 2-D array for argument 'y',
but got a 1-D array instead.
"""
if isinstance(__funcname, str):
def get_funcname(_):
return __funcname
else:
get_funcname = __funcname
def _expect_dimension(expected_ndim):
def _check(func, argname, argvalue):
actual_ndim = argvalue.ndim
if actual_ndim != expected_ndim:
if actual_ndim == 0:
actual_repr = 'scalar'
else:
actual_repr = "%d-D array" % actual_ndim
raise ValueError(
"{func}() expected a {expected:d}-D array"
" for argument {argname!r}, but got a {actual}"
" instead.".format(
func=get_funcname(func),
expected=expected_ndim,
argname=argname,
actual=actual_repr,
)
)
return argvalue
return _check
return preprocess(**valmap(_expect_dimension, dimensions))
def test_deltas_only_one_delta_in_universe(self, asset_info):
expr = bz.data(self.df, name='expr', dshape=self.dshape)
deltas = pd.DataFrame({
'sid': [65, 66],
'asof_date': [self.dates[1], self.dates[0]],
'timestamp': [self.dates[2], self.dates[1]],
'value': [10, 11],
})
deltas = bz.data(deltas, name='deltas', dshape=self.dshape)
expected_views = keymap(pd.Timestamp, {
'2014-01-02': np.array([[0.0, 11.0, 2.0],
[1.0, 2.0, 3.0]]),
'2014-01-03': np.array([[10.0, 2.0, 3.0],
[2.0, 3.0, 4.0]]),
'2014-01-04': np.array([[2.0, 3.0, 4.0],
[2.0, 3.0, 4.0]]),
})
nassets = len(asset_info)
if nassets == 4:
expected_views = valmap(
lambda view: np.c_[view, [np.nan, np.nan]],
expected_views,
)
with tmp_asset_finder(equities=asset_info) as finder:
expected_output = pd.DataFrame(
columns=[
'value',
],
data=np.array([11, 10, 4]).repeat(len(asset_info.index)),
index=pd.MultiIndex.from_product((
sorted(expected_views.keys()),
finder.retrieve_all(asset_info.index),
)),
)
dates = self.dates
dates = dates.insert(len(dates), dates[-1] + timedelta(days=1))
self._run_pipeline(
expr,
deltas,
None,
expected_views,
expected_output,
finder,
calendar=dates,
start=dates[1],
end=dates[-1],
window_length=2,
compute_fn=np.nanmax,
)