def market_minute_window(self, start, count, step=1):
"""
Return a DatetimeIndex containing `count` market minutes, starting with
`start` and continuing `step` minutes at a time.
"""
if not self.is_market_hours(start):
raise ValueError("market_minute_window starting at "
"non-market time {minute}".format(minute=start))
all_minutes = []
current_day_minutes = self.market_minutes_for_day(start)
first_minute_idx = current_day_minutes.searchsorted(start)
minutes_in_range = current_day_minutes[first_minute_idx::step]
# Build up list of lists of days' market minutes until we have count
# minutes stored altogether.
while True:
if len(minutes_in_range) >= count:
# Truncate off extra minutes
minutes_in_range = minutes_in_range[:count]
all_minutes.append(minutes_in_range)
count -= len(minutes_in_range)
if count <= 0:
break
if step > 0:
start, _ = self.next_open_and_close(start)
current_day_minutes = self.market_minutes_for_day(start)
else:
_, start = self.previous_open_and_close(start)
current_day_minutes = self.market_minutes_for_day(start)
minutes_in_range = current_day_minutes[::step]
# Concatenate all the accumulated minutes.
return pd.DatetimeIndex(
np.concatenate(all_minutes), copy=False, tz='UTC',
)
python类DatetimeIndex()的实例源码
def get_early_closes(start, end):
# TSX closed at 1:00 PM on december 24th.
start = canonicalize_datetime(start)
end = canonicalize_datetime(end)
start = max(start, datetime(1993, 1, 1, tzinfo=pytz.utc))
end = max(end, datetime(1993, 1, 1, tzinfo=pytz.utc))
# Not included here are early closes prior to 1993
# or unplanned early closes
early_close_rules = []
christmas_eve = rrule.rrule(
rrule.MONTHLY,
bymonth=12,
bymonthday=24,
byweekday=(rrule.MO, rrule.TU, rrule.WE, rrule.TH, rrule.FR),
cache=True,
dtstart=start,
until=end
)
early_close_rules.append(christmas_eve)
early_close_ruleset = rrule.rruleset()
for rule in early_close_rules:
early_close_ruleset.rrule(rule)
early_closes = early_close_ruleset.between(start, end, inc=True)
early_closes.sort()
return pd.DatetimeIndex(early_closes)
def current_dates(self):
where = slice(self._start_index, self._pos)
return pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
def get_current(self):
"""
Get a Panel that is the current data in view. It is not safe to persist
these objects because internal data might change
"""
where = slice(self._oldest_frame_idx(), self._pos)
major_axis = pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
return pd.Panel(self.buffer.values[:, where, :], self.items,
major_axis, self.minor_axis, dtype=self.dtype)
def current_dates(self):
where = slice(self._oldest_frame_idx(), self._pos)
return pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
def pipeline_event_loader_args(self, dates):
"""Construct the base object to pass to the loader.
Parameters
----------
dates : pd.DatetimeIndex
The dates we can serve.
Returns
-------
args : tuple[any]
The arguments to forward to the loader positionally.
"""
return dates, self.get_dataset()
def temp_pipeline_engine(calendar, sids, random_seed, symbols=None):
"""
A contextManager that yields a SimplePipelineEngine holding a reference to
an AssetFinder generated via tmp_asset_finder.
Parameters
----------
calendar : pd.DatetimeIndex
Calendar to pass to the constructed PipelineEngine.
sids : iterable[int]
Sids to use for the temp asset finder.
random_seed : int
Integer used to seed instances of SeededRandomLoader.
symbols : iterable[str], optional
Symbols for constructed assets. Forwarded to make_simple_equity_info.
"""
equity_info = make_simple_equity_info(
sids=sids,
start_date=calendar[0],
end_date=calendar[-1],
symbols=symbols,
)
loader = make_seeded_random_loader(random_seed, calendar, sids)
get_loader = lambda column: loader
with tmp_asset_finder(equities=equity_info) as finder:
yield SimplePipelineEngine(get_loader, calendar, finder)
def has_data_for_dates(series_or_df, first_date, last_date):
"""
Does `series_or_df` have data on or before first_date and on or after
last_date?
"""
dts = series_or_df.index
if not isinstance(dts, pd.DatetimeIndex):
raise TypeError("Expected a DatetimeIndex, but got %s." % type(dts))
first, last = dts[[0, -1]]
return (first <= first_date) and (last >= last_date)
def load_prices_from_csv(filepath, identifier_col, tz='UTC'):
data = pd.read_csv(filepath, index_col=identifier_col)
data.index = pd.DatetimeIndex(data.index, tz=tz)
data.sort_index(inplace=True)
return data
def __init__(self,
first_trading_day,
minute_index,
market_opens,
market_closes,
ohlc_ratio):
"""
Parameters:
-----------
first_trading_day : datetime-like
UTC midnight of the first day available in the dataset.
minute_index : pd.DatetimeIndex
The minutes which act as an index into the corresponding values
written into each sid's ctable.
market_opens : pd.DatetimeIndex
The market opens for each day in the data set. (Not yet required.)
market_closes : pd.DatetimeIndex
The market closes for each day in the data set. (Not yet required.)
ohlc_ratio : int
The factor by which the pricing data is multiplied so that the
float data can be stored as an integer.
"""
self.first_trading_day = first_trading_day
self.minute_index = minute_index
self.market_opens = market_opens
self.market_closes = market_closes
self.ohlc_ratio = ohlc_ratio
def overwrite_novel_deltas(baseline, deltas, dates):
"""overwrite any deltas into the baseline set that would have changed our
most recently known value.
Parameters
----------
baseline : pd.DataFrame
The first known values.
deltas : pd.DataFrame
Overwrites to the baseline data.
dates : pd.DatetimeIndex
The dates requested by the loader.
Returns
-------
non_novel_deltas : pd.DataFrame
The deltas that do not represent a baseline value.
"""
get_indexes = dates.searchsorted
novel_idx = (
get_indexes(deltas[TS_FIELD_NAME].values, 'right') -
get_indexes(deltas[AD_FIELD_NAME].values, 'left')
) <= 1
novel_deltas = deltas.loc[novel_idx]
non_novel_deltas = deltas.loc[~novel_idx]
return sort_values(pd.concat(
(baseline, novel_deltas),
ignore_index=True,
), TS_FIELD_NAME), non_novel_deltas
def adjustments_from_deltas_no_sids(dense_dates,
sparse_dates,
column_idx,
column_name,
asset_idx,
deltas):
"""Collect all the adjustments that occur in a dataset that does not
have a sid column.
Parameters
----------
dense_dates : pd.DatetimeIndex
The dates requested by the loader.
sparse_dates : pd.DatetimeIndex
The dates that were in the raw data.
column_idx : int
The index of the column in the dataset.
column_name : str
The name of the column to compute deltas for.
asset_idx : pd.Series[int -> int]
The mapping of sids to their index in the output.
deltas : pd.DataFrame
The overwrites that should be applied to the dataset.
Returns
-------
adjustments : dict[idx -> Float64Overwrite]
The adjustments dictionary to feed to the adjusted array.
"""
ad_series = deltas[AD_FIELD_NAME]
idx = 0, len(asset_idx) - 1
return {
dense_dates.get_loc(kd): overwrite_from_dates(
ad_series.loc[kd],
dense_dates,
sparse_dates,
idx,
v,
) for kd, v in deltas[column_name].iteritems()
}
def test_custom_query_time_tz(self):
df = self.df.copy()
df['timestamp'] = (
pd.DatetimeIndex(df['timestamp'], tz='EST') +
timedelta(hours=8, minutes=44)
).tz_convert('utc').tz_localize(None)
df.ix[3:5, 'timestamp'] = pd.Timestamp('2014-01-01 13:45')
expr = bz.data(df, name='expr', dshape=self.dshape)
loader = BlazeLoader(data_query_time=time(8, 45), data_query_tz='EST')
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
)
p = Pipeline()
p.add(ds.value.latest, 'value')
p.add(ds.int_value.latest, 'int_value')
dates = self.dates
with tmp_asset_finder() as finder:
result = SimplePipelineEngine(
loader,
dates,
finder,
).run_pipeline(p, dates[0], dates[-1])
expected = df.drop('asof_date', axis=1)
expected['timestamp'] = expected['timestamp'].dt.normalize().astype(
'datetime64[ns]',
).dt.tz_localize('utc')
expected.ix[3:5, 'timestamp'] += timedelta(days=1)
expected.set_index(['timestamp', 'sid'], inplace=True)
expected.index = pd.MultiIndex.from_product((
expected.index.levels[0],
finder.retrieve_all(expected.index.levels[1]),
))
assert_frame_equal(result, expected, check_dtype=False)
def test_before_trading_start(self, test_name, num_days, freq,
emission_rate):
params = factory.create_simulation_parameters(
num_days=num_days, data_frequency=freq,
emission_rate=emission_rate)
algo = BeforeTradingAlgorithm(sim_params=params)
algo.run(source=[], overwrite_sim_params=False)
self.assertEqual(algo.perf_tracker.day_count, num_days)
self.assertTrue(params.trading_days.equals(
pd.DatetimeIndex(algo.before_trading_at)),
"Expected %s but was %s."
% (params.trading_days, algo.before_trading_at))
def test_insert_hist_data(self):
self._clear_db()
init_db(self.db_info)
# Insert two time-overlapped MarketDataBlocks
async def run(loop, data):
engine = await aiosa.create_engine(
user=self.db_info['user'], db=self.db_info['db'],
host=self.db_info['host'], password=self.db_info['password'],
loop=loop, echo=False)
await insert_hist_data(engine, 'Stock', data[0])
await insert_hist_data(engine, 'Stock', data[1])
engine.close()
await engine.wait_closed()
# Execute insertion
blk0 = MarketDataBlock(testdata_insert_hist_data[0])
blk1 = MarketDataBlock(testdata_insert_hist_data[1])
data = [blk0, blk1]
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop, data))
# Verify insertion
df_source = testdata_insert_hist_data[2]
engine = create_engine(self.db_conn)
conn = engine.connect()
metadata = MetaData(engine, reflect=True)
table = metadata.tables['Stock']
result = conn.execute(select([table]))
# self.assertEqual(result.keys(), list(df_source.columns))
df = pd.DataFrame(result.fetchall())
df.columns = result.keys()
_logger.debug(df.TickerTime[0])
df.TickerTime = pd.DatetimeIndex(df.TickerTime).tz_localize('UTC')
df_source.TickerTime = df_source.TickerTime.apply(pd.Timestamp)
_logger.debug(df.iloc[0])
assert_frame_equal(df, df_source)
def setUp(self):
ind = pd.DatetimeIndex(freq='12h', start='2015-01-01', end='2015-01-02 23:59')
self.insol = pd.Series(data=[500, 1000, 500, 1000], index=ind)
self.energy = pd.Series(data=[1.0, 4, 1.0, 4], index=ind)
self.aggregated = aggregation_insol(self.energy, self.insol, frequency='D')
# Test for the expected energy waited result
def get_series(user,freq='1M'):
user_df = user.sort_values(by='begin_date')
user_series = pd.Series(list(user_df['event']), index=user_df['begin_date'])
user_series.index = pd.DatetimeIndex(user_series.index)
user_resample = user_series.resample(freq).sum().astype('str').replace(r'0', "", regex=True)
#time = pd.date_range(start='2010-01',end='2016-06',freq=freq)
#user_resample = user_resample.reindex(time,fill_value="").astype('str')
#for i in range(len(user_resample)):
# user_resample[i] = "".join(user_resample[i].split('0'))
return user_resample
def get_series(self,freq='1M'):
user_df = self.user.sort_values(by='begin_date')
user_series = pd.Series(list(user_df['event']), index=user_df['begin_date'])
user_series.index = pd.DatetimeIndex(user_series.index)
user_resample = user_series.resample('1D').sum()
time = pd.date_range(start='2010-01',end='2016-06',freq='1D')
user_resample = user_resample.reindex(time,fill_value=0)
user_resample = user_resample.astype('str')
user_resample = user_resample.resample(freq).sum()
for i in range(len(user_resample)):
user_resample[i] = "".join(user_resample[i].split('0'))
return user_resample
def read_co_data_rnn():
"""
?????????????
"""
print("start loading data...")
sql = const.CO_PRICE_SQL_RNN
df = read_data_from_mysql(sql)
# tuples = list(zip(*[range(len(df)), df.price_date]))
# # ??????
# index = pd.MultiIndex.from_tuples(tuples, names=['id', 'date'])
# df.index = index
df.index = pd.DatetimeIndex(df.price_date)
print("loading data finished.")
return df
def read_co_price():
"""
???????????
"""
print("start loading data...")
sql = const.CO_PRICE_SQL
df = read_data_from_mysql(sql)
df.index = pd.DatetimeIndex(df.price_date)
print("loading data finished.")
return df.drop(['price_date'], axis=1)