def __init__(self, restrictions):
# A dict mapping each asset to its restrictions, which are sorted by
# ascending order of effective_date
self._restrictions_by_asset = {
asset: sorted(
restrictions_for_asset, key=lambda x: x.effective_date
)
for asset, restrictions_for_asset
in iteritems(groupby(lambda x: x.asset, restrictions))
}
python类groupby()的实例源码
def split_next_and_previous_event_columns(self, requested_columns):
"""
Split requested columns into columns that should load the next known
value and columns that should load the previous known value.
Parameters
----------
requested_columns : iterable[BoundColumn]
Returns
-------
next_cols, previous_cols : iterable[BoundColumn], iterable[BoundColumn]
``requested_columns``, partitioned into sub-sequences based on
whether the column should produce values from the next event or the
previous event
"""
def next_or_previous(c):
if c in self.next_value_columns:
return 'next'
elif c in self.previous_value_columns:
return 'previous'
raise ValueError(
"{c} not found in next_value_columns "
"or previous_value_columns".format(c=c)
)
groups = groupby(next_or_previous, requested_columns)
return groups.get('next', ()), groups.get('previous', ())
def get_zeroth_quarter_idx(self, stacked_last_per_qtr):
"""
Filters for releases that are on or after each simulation date and
determines the next quarter by picking out the upcoming release for
each date in the index.
Parameters
----------
stacked_last_per_qtr : pd.DataFrame
A DataFrame with index of calendar dates, sid, and normalized
quarters with each row being the latest estimate for the row's
index values, sorted by event date.
Returns
-------
next_releases_per_date_index : pd.MultiIndex
An index of calendar dates, sid, and normalized quarters, for only
the rows that have a next event.
"""
next_releases_per_date = stacked_last_per_qtr.loc[
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] >=
stacked_last_per_qtr.index.get_level_values(SIMULATION_DATES)
].groupby(
level=[SIMULATION_DATES, SID_FIELD_NAME],
as_index=False,
# Here we take advantage of the fact that `stacked_last_per_qtr` is
# sorted by event date.
).nth(0)
return next_releases_per_date.index
def get_zeroth_quarter_idx(self, stacked_last_per_qtr):
"""
Filters for releases that are on or after each simulation date and
determines the previous quarter by picking out the most recent
release relative to each date in the index.
Parameters
----------
stacked_last_per_qtr : pd.DataFrame
A DataFrame with index of calendar dates, sid, and normalized
quarters with each row being the latest estimate for the row's
index values, sorted by event date.
Returns
-------
previous_releases_per_date_index : pd.MultiIndex
An index of calendar dates, sid, and normalized quarters, for only
the rows that have a previous event.
"""
previous_releases_per_date = stacked_last_per_qtr.loc[
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] <=
stacked_last_per_qtr.index.get_level_values(SIMULATION_DATES)
].groupby(
level=[SIMULATION_DATES, SID_FIELD_NAME],
as_index=False,
# Here we take advantage of the fact that `stacked_last_per_qtr` is
# sorted by event date.
).nth(-1)
return previous_releases_per_date.index
def _message_handlers(self):
def create(prio, h):
h = List.wrap(h).apzip(_.message).map2(flip)
return prio, Handlers(prio, Map(h))
return Map(toolz.groupby(_.prio, self._handlers)).map(create)
def create_notifications(self, notifications):
results = []
# create non-existant users before creating notifications
usernames = []
for notification in notifications:
usernames.append(notification['to_username'])
usernames.append(notification.get('from_username'))
usernames = set(u for u in usernames if u)
results.append(await self.create_users(usernames))
# group notifications by keys to allow multi-row inserts
# grouped_notifications = toolz.groupby(lambda x: tuple(x.keys()),
# notifications)
# logger.debug('create_notifications',
# notification_count=len(notifications),
# group_count=len(grouped_notifications.keys()))
#futures = []
wwwpoll_columns = set(c.name for c in wwwpoll_table.c._all_columns)
async with self.async_engine.acquire() as conn:
for n in notifications:
results.append(await
conn.execute(notifications_table.insert().values(**n)))
n2 = toolz.keyfilter(lambda k: k in wwwpoll_columns, n)
results.append(await conn.execute(wwwpoll_table.insert().values(**n2)))
return all(results)
# notification retrieval methods
# pylint: disable=too-many-arguments,too-many-locals
def lazy_proxy_dict(artifacts_or_ids, group_artifacts_of_same_name=False):
"""
Takes a list of artifacts or artifact ids and returns a dictionary whose
keys are the names of the artifacts. The values will be lazily loaded into
proxies as requested.
Parameters
----------
artifacts_or_ids : collection of artifacts or artifact ids (strings)
group_artifacts_of_same_name: bool (default: False)
If set to True then artifacts of the same name will be grouped together in
one list. When set to False an exception will be raised
"""
if isinstance(artifacts_or_ids, dict):
artifacts = t.valmap(coerce_to_artifact, artifacts_or_ids)
lambdas = {name: (lambda a: lambda: a.proxy())(a)
for name, a in artifacts.items()}
return lazy_dict(lambdas)
# else we have a collection
artifacts = coerce_to_artifacts(artifacts_or_ids)
by_name = t.groupby(lambda a: a.name, artifacts)
singles = t.valfilter(lambda l: len(l) == 1, by_name)
multi = t.valfilter(lambda l: len(l) > 1, by_name)
lambdas = {name: (lambda a: lambda: a.proxy())(a[0]) for name, a in singles.items()}
if group_artifacts_of_same_name and len(multi) > 0:
lambdas = t.merge(lambdas,
{name:
(lambda artifacts: (lambda: [a.proxy() for a in artifacts]))(artifacts)
for name, artifacts in multi.items()})
if not group_artifacts_of_same_name and len(multi) > 0:
raise ValueError("""Only artifacts with distinct names can be used in a lazy_proxy_dict.
Offending names: {}
Use the option `group_artifacts_of_same_name=True` if you want a list of proxies to be returned under the respective key.
""".format({n: len(a) for n, a in multi.items()}))
return lazy_dict(lambdas)
def get_adjustments(self,
zero_qtr_data,
requested_qtr_data,
last_per_qtr,
dates,
assets,
columns,
**kwargs):
"""
Creates an AdjustedArray from the given estimates data for the given
dates.
Parameters
----------
zero_qtr_data : pd.DataFrame
The 'time zero' data for each calendar date per sid.
requested_qtr_data : pd.DataFrame
The requested quarter data for each calendar date per sid.
last_per_qtr : pd.DataFrame
A DataFrame with a column MultiIndex of [self.estimates.columns,
normalized_quarters, sid] that allows easily getting the timeline
of estimates for a particular sid for a particular quarter.
dates : pd.DatetimeIndex
The calendar dates for which estimates data is requested.
assets : pd.Int64Index
An index of all the assets from the raw data.
columns : list of BoundColumn
The columns for which adjustments need to be calculated.
kwargs :
Additional keyword arguments that should be forwarded to
`get_adjustments_for_sid` and to be used in computing adjustments
for each sid.
Returns
-------
col_to_all_adjustments : dict[int -> AdjustedArray]
A dictionary of all adjustments that should be applied.
"""
zero_qtr_data.sort_index(inplace=True)
# Here we want to get the LAST record from each group of records
# corresponding to a single quarter. This is to ensure that we select
# the most up-to-date event date in case the event date changes.
quarter_shifts = zero_qtr_data.groupby(
level=[SID_FIELD_NAME, NORMALIZED_QUARTERS]
).nth(-1)
col_to_all_adjustments = {}
sid_to_idx = dict(zip(assets, range(len(assets))))
quarter_shifts.groupby(level=SID_FIELD_NAME).apply(
self.get_adjustments_for_sid,
dates,
requested_qtr_data,
last_per_qtr,
sid_to_idx,
columns,
col_to_all_adjustments,
**kwargs
)
return col_to_all_adjustments