def _pipeline_output(self, pipeline, chunks):
"""
Internal implementation of `pipeline_output`.
"""
today = normalize_date(self.get_datetime())
try:
data = self._pipeline_cache.unwrap(today)
except Expired:
data, valid_until = self._run_pipeline(
pipeline, today, next(chunks),
)
self._pipeline_cache = CachedObject(data, valid_until)
# Now that we have a cached result, try to return the data for today.
try:
return data.loc[today]
except KeyError:
# This happens if no assets passed the pipeline screen on a given
# day.
return pd.DataFrame(index=[], columns=data.columns)
评论列表
文章目录