def visit_enumeration(self, node, children):
def iter_enumerations():
integers_or_symbols = concatv(
find(children, type='integer'),
find(children, type='symbol'),
)
values = list(pluck('value', integers_or_symbols))
if values:
yield make_json_ast_node(
type='enumeration_values',
values=values,
)
intervals = find_many_or_none(children, type='interval')
if intervals is not None:
yield from intervals
assert isinstance(children, list), children
return list(iter_enumerations())
python类concatv()的实例源码
m_source_file_to_json_ast.py 文件源码
项目:calculette-impots-m-language-parser
作者: openfisca
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_id_macro_dataset(self):
"""
input (self.macro_df)
asof_date timestamp value
0 2014-01-01 2014-01-01 0
3 2014-01-02 2014-01-02 1
6 2014-01-03 2014-01-03 2
output (expected):
value
2014-01-01 Equity(65 [A]) 0
Equity(66 [B]) 0
Equity(67 [C]) 0
2014-01-02 Equity(65 [A]) 1
Equity(66 [B]) 1
Equity(67 [C]) 1
2014-01-03 Equity(65 [A]) 2
Equity(66 [B]) 2
Equity(67 [C]) 2
"""
asset_info = asset_infos[0][0]
nassets = len(asset_info)
with tmp_asset_finder() as finder:
expected = pd.DataFrame(
list(concatv([0] * nassets, [1] * nassets, [2] * nassets)),
index=pd.MultiIndex.from_product((
self.macro_df.timestamp,
finder.retrieve_all(asset_info.index),
)),
columns=('value',),
)
self._test_id(
self.macro_df,
self.macro_dshape,
expected,
finder,
('value',),
)
def __new__(mcls, name, bases, dict_):
self = super().__new__(mcls, name, bases, dict_)
if len(bases) and bases[0] is ADT:
self._typevars = dict_._typevars
self._constructors = tuple(dict_._constructors.values())
constructors = set(self._constructors)
for constructor in constructors:
types = concatv(
constructor._args,
constructor._kwargs.values(),
)
for t in types:
if isinstance(t, RecursiveType) and t._name != name:
raise TypeError(
'recursive type name must be the same as the type'
' name, %r != %r' % (
t._name,
name,
),
)
if t in constructors:
raise TypeError(
'constructor %r has arguments that are other'
' constructors' % constructor,
)
if not self._typevars:
return adt(self, ())
return self
python_source_visitors.py 文件源码
项目:calculette-impots-python
作者: openfisca
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def visit_infix_expression(node, operators={}):
def interleave(*iterables):
for values in itertools.zip_longest(*iterables, fillvalue=UnboundLocalError):
for index, value in enumerate(values):
if value != UnboundLocalError:
yield index, value
tokens = [
visit_node(operand_or_operator)
if index == 0
else operators.get(operand_or_operator, operand_or_operator)
for index, operand_or_operator in interleave(node['operands'], node['operators'])
]
# Transform product expressions into a lazy "and" expression in order to prevent a division by 0:
if node['type'] == 'product_expression':
tokens = concatv(
interpose(
el='and',
seq=map(visit_node, node['operands']),
),
['and'],
tokens,
)
return '({})'.format(' '.join(map(str, tokens)))
# Main visitor
def generate_stencil_points(self):
return concatv(self._stencil_points, self._stencil_iter)
def update_usage_search_locations(self, platform: str):
'''Update the places where usages are found
Call this whenever you load new modules or scripts.
'''
if platform.lower().startswith('python'):
from . import jedi_dump
jedi_dump.JediCodeElementNode.usage_resolution_modules = (
frozenset((nn.module_context for nn in
tz.concatv(self.module_nodes[platform].values(),
self.script_nodes[platform].values())
if nn.code_element.path)))
def keyPressEvent(self, event):
super().keyPressEvent(event) #ll.setFocus()
if not event.isAccepted():
if event.key() == Qt.Qt.Key_Right:
_next = self.next_call_list()
if _next and _next.count() > 0:
if all(future.done() for future in
tz.concatv(self.populate_futures, self.add_next_futures)):
_next.setFocus()
self.map_widget.ensureWidgetVisible(_next, 0, 0)
if self.next_call_list().currentItem() is None:
self.next_call_list().setCurrentRow(0)
else:
wait_item = _next.item(0)
wait_item.poked += 1
if wait_item.poked == 3:
wait_item.setText('QUIT POKING ME')
event.accept()
if event.key() == Qt.Qt.Key_Left:
_prev = self.prev_call_list()
if _prev:
_prev.setFocus()
(self.map_widget.ensureWidgetVisible(_prev, 0, 0))
event.accept()
if event.key() == Qt.Qt.Key_Space:
pass
def append(self, other: 'LazyList[A]') -> 'LazyList[A]':
return self.copy(lambda s: concatv(s, self.strict, other.source, other.strict), lambda s: List())
def test_deltas_macro(self):
asset_info = asset_infos[0][0]
expr = bz.data(self.macro_df, name='expr', dshape=self.macro_dshape)
deltas = bz.data(
self.macro_df.iloc[:-1],
name='deltas',
dshape=self.macro_dshape,
)
deltas = bz.transform(
deltas,
value=deltas.value + 10,
timestamp=deltas.timestamp + timedelta(days=1),
)
nassets = len(asset_info)
expected_views = keymap(pd.Timestamp, {
'2014-01-02': repeat_last_axis(np.array([10.0, 1.0]), nassets),
'2014-01-03': repeat_last_axis(np.array([11.0, 2.0]), nassets),
})
with tmp_asset_finder(equities=asset_info) as finder:
expected_output = pd.DataFrame(
list(concatv([10] * nassets, [11] * nassets)),
index=pd.MultiIndex.from_product((
sorted(expected_views.keys()),
finder.retrieve_all(asset_info.index),
)),
columns=('value',),
)
dates = self.dates
self._run_pipeline(
expr,
deltas,
expected_views,
expected_output,
finder,
calendar=dates,
start=dates[1],
end=dates[-1],
window_length=2,
compute_fn=np.nanmax,
)
def test_novel_deltas_macro(self):
asset_info = asset_infos[0][0]
base_dates = pd.DatetimeIndex([
pd.Timestamp('2014-01-01'),
pd.Timestamp('2014-01-04')
])
baseline = pd.DataFrame({
'value': (0, 1),
'asof_date': base_dates,
'timestamp': base_dates,
})
expr = bz.data(baseline, name='expr', dshape=self.macro_dshape)
deltas = bz.data(baseline, name='deltas', dshape=self.macro_dshape)
deltas = bz.transform(
deltas,
value=deltas.value + 10,
timestamp=deltas.timestamp + timedelta(days=1),
)
nassets = len(asset_info)
expected_views = keymap(pd.Timestamp, {
'2014-01-03': repeat_last_axis(
np.array([10.0, 10.0, 10.0]),
nassets,
),
'2014-01-06': repeat_last_axis(
np.array([10.0, 10.0, 11.0]),
nassets,
),
})
cal = pd.DatetimeIndex([
pd.Timestamp('2014-01-01'),
pd.Timestamp('2014-01-02'),
pd.Timestamp('2014-01-03'),
# omitting the 4th and 5th to simulate a weekend
pd.Timestamp('2014-01-06'),
])
with tmp_asset_finder(equities=asset_info) as finder:
expected_output = pd.DataFrame(
list(concatv([10] * nassets, [11] * nassets)),
index=pd.MultiIndex.from_product((
sorted(expected_views.keys()),
finder.retrieve_all(asset_info.index),
)),
columns=('value',),
)
self._run_pipeline(
expr,
deltas,
expected_views,
expected_output,
finder,
calendar=cal,
start=cal[2],
end=cal[-1],
window_length=3,
compute_fn=op.itemgetter(-1),
)
def summary(feature_names, features, **labels):
"""Summarize the data we are about to train with.
Parameters
----------
feature_names : iterable[str]
The names of the features in the ``features`` array.
features : np.ndarray
The 3d feature array.
**labels
The named label arrays.
Returns
-------
summary : str
A summary of the features and labels.
"""
single_attribute_template = dedent(
"""\
{name}:
mean: {mean}
std: {std}
min: {min}
max: {max}""",
)
def format_attribute(name, values):
return ' ' + '\n '.join(
single_attribute_template.format(
name=name,
mean=values.mean(),
std=values.std(),
min=values.min(),
max=values.max(),
).splitlines(),
)
return '\n'.join(concatv(
(
'summary:',
' labels:',
),
(
format_attribute(name, value)
for name, value in sorted(labels.items(), key=first)
),
(
'features:',
),
(
format_attribute(name, features[..., ix])
for ix, name in enumerate(feature_names)
)
))
def load_extensions(default, extensions, strict, environ, reload=False):
"""Load all of the given extensions. This should be called by run_algo
or the cli.
Parameters
----------
default : bool
Load the default exension (~/.catalyst/extension.py)?
extension : iterable[str]
The paths to the extensions to load. If the path ends in ``.py`` it is
treated as a script and executed. If it does not end in ``.py`` it is
treated as a module to be imported.
strict : bool
Should failure to load an extension raise. If this is false it will
still warn.
environ : mapping
The environment to use to find the default extension path.
reload : bool, optional
Reload any extensions that have already been loaded.
"""
if default:
default_extension_path = pth.default_extension(environ=environ)
pth.ensure_file(default_extension_path)
# put the default extension first so other extensions can depend on
# the order they are loaded
extensions = concatv([default_extension_path], extensions)
for ext in extensions:
if ext in _loaded_extensions and not reload:
continue
try:
# load all of the catalyst extensionss
if ext.endswith('.py'):
run_path(ext, run_name='<extension>')
else:
__import__(ext)
except Exception as e:
if strict:
# if `strict` we should raise the actual exception and fail
raise
# without `strict` we should just log the failure
warnings.warn(
'Failed to load extension: %r\n%s' % (ext, e),
stacklevel=2
)
else:
_loaded_extensions.add(ext)
def test_deltas_macro(self):
expr = bz.data(self.macro_df, name='expr', dshape=self.macro_dshape)
deltas = bz.data(
self.macro_df.iloc[:-1],
name='deltas',
dshape=self.macro_dshape,
)
deltas = bz.transform(
deltas,
value=deltas.value + 10,
timestamp=deltas.timestamp + timedelta(days=1),
)
nassets = len(simple_asset_info)
expected_views = keymap(pd.Timestamp, {
'2014-01-02': np.array([[10.0],
[1.0]]),
'2014-01-03': np.array([[11.0],
[2.0]]),
})
with tmp_asset_finder(equities=simple_asset_info) as finder:
expected_output = pd.DataFrame(
list(concatv([10] * nassets, [11] * nassets)),
index=pd.MultiIndex.from_product((
sorted(expected_views.keys()),
finder.retrieve_all(simple_asset_info.index),
)),
columns=('value',),
)
dates = self.dates
self._run_pipeline(
expr,
deltas,
None,
expected_views,
expected_output,
finder,
calendar=dates,
start=dates[1],
end=dates[-1],
window_length=2,
compute_fn=np.nanmax,
)
def _test_checkpoints_macro(self, checkpoints, ffilled_value=-1.0):
"""Simple checkpoints test that accepts a checkpoints dataframe and
the expected value for 2014-01-03 for macro datasets.
The underlying data has value -1.0 on 2014-01-01 and 1.0 on 2014-01-04.
Parameters
----------
checkpoints : pd.DataFrame
The checkpoints data.
ffilled_value : float, optional
The value to be read on the third, if not provided, it will be the
value in the base data that will be naturally ffilled there.
"""
dates = pd.Timestamp('2014-01-01'), pd.Timestamp('2014-01-04')
baseline = pd.DataFrame({
'value': [-1.0, 1.0],
'asof_date': dates,
'timestamp': dates,
})
nassets = len(simple_asset_info)
expected_views = keymap(pd.Timestamp, {
'2014-01-03': np.array([[ffilled_value]]),
'2014-01-04': np.array([[1.0]]),
})
with tmp_asset_finder(equities=simple_asset_info) as finder:
expected_output = pd.DataFrame(
list(concatv([ffilled_value] * nassets, [1.0] * nassets)),
index=pd.MultiIndex.from_product((
sorted(expected_views.keys()),
finder.retrieve_all(simple_asset_info.index),
)),
columns=('value',),
)
self._run_pipeline(
bz.data(baseline, name='expr', dshape=self.macro_dshape),
None,
bz.data(
checkpoints,
name='expr_checkpoints',
dshape=self.macro_dshape,
),
expected_views,
expected_output,
finder,
calendar=pd.date_range('2014-01-01', '2014-01-04'),
start=pd.Timestamp('2014-01-03'),
end=dates[-1],
window_length=1,
compute_fn=op.itemgetter(-1),
)