def zipline_splits_and_dividends(symbol_map):
raw_splits, raw_dividends = load_splits_and_dividends()
splits = []
dividends = []
for sid, code in symbol_map.iteritems():
if code in raw_splits:
split = pd.DataFrame(data=raw_splits[code])
split['sid'] = sid
split.index = split['effective_date'] = pd.DatetimeIndex(split['effective_date'])
splits.append(split)
if code in raw_dividends:
dividend = pd.DataFrame(data = raw_dividends[code])
dividend['sid'] = sid
dividend['record_date'] = dividend['declared_date'] = dividend['pay_date'] = pd.NaT
dividend.index = dividend['ex_date'] = pd.DatetimeIndex(dividend['ex_date'])
dividends.append(dividend)
return splits, dividends
python类NaT()的实例源码
def _display_dimensions(self, dimensions, operations):
req_dimension_keys = [utils.slice_first(dimension)
for dimension in dimensions]
display_dims = OrderedDict()
for key in req_dimension_keys:
dimension = self.slicer.dimensions[key]
display_dim = {'label': dimension.label}
if hasattr(dimension, 'display_options'):
display_dim['display_options'] = {opt.key: opt.label
for opt in dimension.display_options}
display_dim['display_options'].update({pd.NaT: '', np.nan: ''})
if hasattr(dimension, 'display_field') and dimension.display_field:
display_dim['display_field'] = '%s_display' % dimension.key
display_dims[key] = display_dim
return display_dims
def test_categorical_dimension(self):
display_schema = self.test_slicer.manager.display_schema(
metrics=['foo'],
dimensions=['locale'],
)
self.assertDictEqual(
{
'metrics': {'foo': {'label': 'foo', 'axis': 0}},
'dimensions': {
'locale': {'label': 'Locale', 'display_options': {
'us': 'United States', 'de': 'Germany', np.nan: '', pd.NaT: ''
}},
},
'references': {},
},
display_schema
)
def test_multiple_metrics_and_dimensions(self):
display_schema = self.test_slicer.manager.display_schema(
metrics=['foo', 'bar'],
dimensions=[('date', DatetimeDimension.month), ('clicks', 50, 100), 'locale', 'account'],
)
self.assertDictEqual(
{
'metrics': {
'foo': {'label': 'foo', 'axis': 0},
'bar': {'label': 'FizBuz', 'axis': 1},
},
'dimensions': {
'date': {'label': 'date'},
'clicks': {'label': 'My Clicks'},
'locale': {'label': 'Locale', 'display_options': {
'us': 'United States', 'de': 'Germany', np.nan: '', pd.NaT: ''
}},
'account': {'label': 'Account', 'display_field': 'account_display'},
},
'references': {},
},
display_schema
)
def _make_time(timearr):
"""Return a :class:`datetime.datetime` object for the array of characters.
Args:
timearr (:class:`numpy.ndarray`): An array of characters.
Returns:
:class:`datetime.datetime`: A datetime object.
"""
try:
return dt.datetime.strptime("".join(npbytes_to_str(timearr)),
"%Y-%m-%d_%H:%M:%S")
except ValueError:
return np.datetime64("NaT")
def test_date_breaks():
# cpython
x = [datetime(year, 1, 1) for year in [2010, 2026, 2015]]
limits = min(x), max(x)
breaks = date_breaks('5 Years')
years = [d.year for d in breaks(limits)]
npt.assert_array_equal(
years, [2010, 2015, 2020, 2025, 2030])
breaks = date_breaks('10 Years')
years = [d.year for d in breaks(limits)]
npt.assert_array_equal(years, [2010, 2020, 2030])
# numpy
x = [np.datetime64(i*10, 'D') for i in range(1, 10)]
breaks = date_breaks('10 Years')
limits = min(x), max(x)
with pytest.raises(AttributeError):
breaks(limits)
# NaT
limits = np.datetime64('NaT'), datetime(2017, 1, 1)
breaks = date_breaks('10 Years')
assert len(breaks(limits)) == 0
def automatic_events(self, timestamp):
"""
Update the current time of the Blotter, triggering all scheduled events
between previous clock time and new clock time such as interest
charges, margin charges, PnL calculations and PnL sweeps. See
create_events() for more information on the type of events.
Parameters
----------
timestamp: pandas.Timestamp
Time to update clock to and tigger internal events up until
"""
current_time = self._holdings.timestamp
# first event so there is nothing automatic that needs to be done
if current_time is pd.NaT:
return
actions = self._get_actions(current_time, timestamp, self._actions)
for ts, action in actions.iteritems():
events = self.create_events(ts, action)
self.dispatch_events(events)
def pad(self, sid, date):
"""
Fill sid container with empty data through the specified date.
e.g. if the date is two days after the last date in the sid's existing
output, 2 x `minute_per_day` worth of zeros will be added to the
output.
Parameters:
-----------
sid : int
The asset identifier for the data being written.
date : datetime-like
The date used to calculate how many slots to be pad.
The padding is done through the date, i.e. after the padding is
done the `last_date_in_output_for_sid` will be equal to `date`
"""
table = self._ensure_ctable(sid)
last_date = self.last_date_in_output_for_sid(sid)
tds = self._trading_days
if date <= last_date or date < tds[0]:
# No need to pad.
return
if last_date == pd.NaT:
# If there is no data, determine how many days to add so that
# desired days are written to the correct slots.
days_to_zerofill = tds[tds.slice_indexer(end=date)]
else:
days_to_zerofill = tds[tds.slice_indexer(
start=last_date + tds.freq,
end=date)]
self._zerofill(table, len(days_to_zerofill))
new_last_date = self.last_date_in_output_for_sid(sid)
assert new_last_date == date, "new_last_date={0} != date={1}".format(
new_last_date, date)
def __init__(self,
window,
items,
sids,
cap_multiple=2,
dtype=np.float64,
initial_dates=None):
self._pos = window
self._window = window
self.items = _ensure_index(items)
self.minor_axis = _ensure_index(sids)
self.cap_multiple = cap_multiple
self.dtype = dtype
if initial_dates is None:
self.date_buf = np.empty(self.cap, dtype='M8[ns]') * pd.NaT
elif len(initial_dates) != window:
raise ValueError('initial_dates must be of length window')
else:
self.date_buf = np.hstack(
(
initial_dates,
np.empty(
window * (cap_multiple - 1),
dtype='datetime64[ns]',
),
),
)
self.buffer = self._create_buffer()
def _update_dividends(self, asset_id, raw_data):
divs = raw_data.ex_dividend
df = pd.DataFrame({'amount': divs[divs != 0]})
df.index.name = 'ex_date'
df.reset_index(inplace=True)
df['sid'] = asset_id
# we do not have this data in the WIKI dataset
df['record_date'] = df['declared_date'] = df['pay_date'] = pd.NaT
self.dividends.append(df)
def last_date_in_output_for_sid(self, sid):
"""
Parameters
----------
sid : int
Asset identifier.
Returns
-------
out : pd.Timestamp
The midnight of the last date written in to the output for the
given sid.
"""
sizes_path = "{0}/close/meta/sizes".format(self.sidpath(sid))
if not os.path.exists(sizes_path):
return pd.NaT
with open(sizes_path, mode='r') as f:
sizes = f.read()
data = json.loads(sizes)
# use integer division so that the result is an int
# for pandas index later https://github.com/pandas-dev/pandas/blob/master/pandas/tseries/base.py#L247 # noqa
num_days = data['shape'][0] // self._minutes_per_day
if num_days == 0:
# empty container
return pd.NaT
return self._session_labels[num_days - 1]
def get_last_traded_dt(self, asset, dt):
"""
Get the latest minute on or before ``dt`` in which ``asset`` traded.
If there are no trades on or before ``dt``, returns ``pd.NaT``.
Parameters
----------
asset : catalyst.asset.Asset
The asset for which to get the last traded minute.
dt : pd.Timestamp
The minute at which to start searching for the last traded minute.
Returns
-------
last_traded : pd.Timestamp
The dt of the last trade for the given asset, using the input
dt as a vantage point.
"""
rf = self._roll_finders[asset.roll_style]
sid = (rf.get_contract_center(asset.root_symbol,
dt,
asset.offset))
if sid is None:
return pd.NaT
contract = rf.asset_finder.retrieve_asset(sid)
return self._bar_reader.get_last_traded_dt(contract, dt)
def get_last_traded_dt(self, asset, dt):
"""
Get the latest minute on or before ``dt`` in which ``asset`` traded.
If there are no trades on or before ``dt``, returns ``pd.NaT``.
Parameters
----------
asset : catalyst.asset.Asset
The asset for which to get the last traded minute.
dt : pd.Timestamp
The minute at which to start searching for the last traded minute.
Returns
-------
last_traded : pd.Timestamp
The dt of the last trade for the given asset, using the input
dt as a vantage point.
"""
rf = self._roll_finders[asset.roll_style]
sid = (rf.get_contract_center(asset.root_symbol,
dt,
asset.offset))
if sid is None:
return pd.NaT
contract = rf.asset_finder.retrieve_asset(sid)
return self._bar_reader.get_last_traded_dt(contract, dt)
def _get_daily_spot_value(self, asset, column, dt):
reader = self._get_pricing_reader('daily')
if column == "last_traded":
last_traded_dt = reader.get_last_traded_dt(asset, dt)
if isnull(last_traded_dt):
return pd.NaT
else:
return last_traded_dt
elif column in OHLCV_FIELDS:
# don't forward fill
try:
return reader.get_value(asset, dt, column)
except NoDataOnDate:
return np.nan
elif column == "price":
found_dt = dt
while True:
try:
value = reader.get_value(
asset, found_dt, "close"
)
if not isnull(value):
if dt == found_dt:
return value
else:
# adjust if needed
return self.get_adjusted_value(
asset, column, found_dt, dt, "minute",
spot_value=value
)
else:
found_dt -= self.trading_calendar.day
except NoDataOnDate:
return np.nan
def assert_same(self, val1, val2):
try:
self.assertEqual(val1, val2)
except AssertionError:
if val1 is pd.NaT:
self.assertTrue(val2 is pd.NaT)
elif np.isnan(val1):
self.assertTrue(np.isnan(val2))
else:
raise
def test_day_before_assets_trading(self):
# use the day before self.bcolz_daily_bar_days[0]
minute = self.get_last_minute_of_session(
self.trading_calendar.previous_session_label(
self.equity_daily_bar_days[0]
)
)
bar_data = self.create_bardata(
simulation_dt_func=lambda: minute,
)
self.check_internal_consistency(bar_data)
self.assertFalse(bar_data.can_trade(self.ASSET1))
self.assertFalse(bar_data.can_trade(self.ASSET2))
self.assertFalse(bar_data.is_stale(self.ASSET1))
self.assertFalse(bar_data.is_stale(self.ASSET2))
for field in ALL_FIELDS:
for asset in self.ASSETS:
asset_value = bar_data.current(asset, field)
if field in OHLCP:
self.assertTrue(np.isnan(asset_value))
elif field == "volume":
self.assertEqual(0, asset_value)
elif field == "last_traded":
self.assertTrue(asset_value is pd.NaT)
def test_semi_active_day(self):
# on self.equity_daily_bar_days[0], only asset1 has data
bar_data = self.create_bardata(
simulation_dt_func=lambda: self.get_last_minute_of_session(
self.equity_daily_bar_days[0]
),
)
self.check_internal_consistency(bar_data)
self.assertTrue(bar_data.can_trade(self.ASSET1))
self.assertFalse(bar_data.can_trade(self.ASSET2))
# because there is real data
self.assertFalse(bar_data.is_stale(self.ASSET1))
# because there has never been a trade bar yet
self.assertFalse(bar_data.is_stale(self.ASSET2))
self.assertEqual(3, bar_data.current(self.ASSET1, "open"))
self.assertEqual(4, bar_data.current(self.ASSET1, "high"))
self.assertEqual(1, bar_data.current(self.ASSET1, "low"))
self.assertEqual(2, bar_data.current(self.ASSET1, "close"))
self.assertEqual(200, bar_data.current(self.ASSET1, "volume"))
self.assertEqual(2, bar_data.current(self.ASSET1, "price"))
self.assertEqual(self.equity_daily_bar_days[0],
bar_data.current(self.ASSET1, "last_traded"))
for field in OHLCP:
self.assertTrue(np.isnan(bar_data.current(self.ASSET2, field)),
field)
self.assertEqual(0, bar_data.current(self.ASSET2, "volume"))
self.assertTrue(
bar_data.current(self.ASSET2, "last_traded") is pd.NaT
)
def encode_as_pandas(obj):
"""Attempt to convert pandas.NaT"""
if not _pandas_imported:
raise NotEncodable
if obj is pandas.NaT:
return None
else:
raise NotEncodable
def test_should_properly_handle_null_timestamp(self):
query = 'SELECT TIMESTAMP(NULL) AS null_timestamp'
df = gbq.read_gbq(query, project_id=_get_project_id(),
private_key=_get_private_key_path())
tm.assert_frame_equal(df, DataFrame({'null_timestamp': [NaT]}))
def allowed_values_exclusions(self):
# remarkably, Pandas returns various kinds of nulls as
# unique values, despite not counting them with .nunique()
return [None, np.nan, pd.NaT]