def cv_splits(self, input):
dates = input[self.date_split_col]
left = dates.max()
split_points = []
for i in range(self.n_folds):
right = left
left = left - pd.Timedelta(days=self.prediction_length)
split_points.append((left, right))
split_points.reverse()
if split_points[0][0] - dates.min() < pd.Timedelta(days=self.prediction_length):
raise Exception('Training set is shorter than the prediction length. Use a less'
'cross validation folds or a shorter prediction length')
split_indices = []
for left, right in split_points:
train = input[dates < left]
cv = input[(dates >= left) & (dates < right)]
split_indices.append([train.index.values, cv.index.values])
return split_indices
python类Timedelta()的实例源码
def make_commodity_future_info(first_sid,
root_symbols,
years,
month_codes=None):
"""
Make futures testing data that simulates the notice/expiration date
behavior of physical commodities like oil.
Parameters
----------
first_sid : int
root_symbols : list[str]
years : list[int]
month_codes : dict[str -> int]
Expiration dates are on the 20th of the month prior to the month code.
Notice dates are are on the 20th two months prior to the month code.
Start dates are one year before the contract month.
See Also
--------
make_future_info
"""
nineteen_days = pd.Timedelta(days=19)
one_year = pd.Timedelta(days=365)
return make_future_info(
first_sid=first_sid,
root_symbols=root_symbols,
years=years,
notice_date_func=lambda dt: dt - MonthBegin(2) + nineteen_days,
expiration_date_func=lambda dt: dt - MonthBegin(1) + nineteen_days,
start_date_func=lambda dt: dt - one_year,
month_codes=month_codes,
)
def test_offset(self):
""" Test the offset method of FutureChain.
"""
cl = FutureChain(self.asset_finder, lambda: '2005-12-01', 'CL')
# Test that an offset forward sets as_of_date as expected
self.assertEqual(
cl.offset('3 days').as_of_date,
cl.as_of_date + pd.Timedelta(days=3)
)
# Test that an offset backward sets as_of_date as expected, with
# time delta given as str, datetime.timedelta, and pd.Timedelta.
self.assertEqual(
cl.offset('-1000 days').as_of_date,
cl.as_of_date + pd.Timedelta(days=-1000)
)
self.assertEqual(
cl.offset(timedelta(days=-1000)).as_of_date,
cl.as_of_date + pd.Timedelta(days=-1000)
)
self.assertEqual(
cl.offset(pd.Timedelta('-1000 days')).as_of_date,
cl.as_of_date + pd.Timedelta(days=-1000)
)
# An offset of zero should give the original chain.
self.assertEqual(cl[0], cl.offset(0)[0])
self.assertEqual(cl[0], cl.offset("0 days")[0])
# A string that doesn't represent a time delta should raise a
# ValueError.
with self.assertRaises(ValueError):
cl.offset("blah")
def test_cached_object(self):
expiry = Timestamp('2014')
before = expiry - Timedelta('1 minute')
after = expiry + Timedelta('1 minute')
obj = CachedObject(1, expiry)
self.assertEqual(obj.unwrap(before), 1)
self.assertEqual(obj.unwrap(expiry), 1) # Unwrap on expiry is allowed.
with self.assertRaises(Expired) as e:
obj.unwrap(after)
self.assertEqual(e.exception.args, (expiry,))
def compute_commit_periods(self, ticket_frame: pd.DataFrame):
commit_dates = ticket_frame.CommitDate
commit_periods = self.compute_periods(commit_dates)
commit_periods = pd.concat(
[pd.Series(data=[pd.Timedelta(days=0)]),
commit_periods]).reset_index(drop=True)
ticket_frame.insert(8, 'CommitPeriod', commit_periods.dt.days)
return ticket_frame
def getMinutesFromMidnight(df, feature):
time_deltas = pd.to_datetime(df[feature]) - pd.to_datetime(df['timestamp'])
mins = [time / pd.Timedelta('1 minute') for time in time_deltas]
return [time if not pd.isnull(time) else np.nan for time in mins]
def getMinutesFromMidnight(df, feature):
time_deltas = pd.to_datetime(df[feature]) - pd.to_datetime(df['timestamp'])
mins = [time / pd.Timedelta('1 minute') for time in time_deltas]
return [time if not pd.isnull(time) else np.nan for time in mins]
def zero_pad_series(series):
"""
"""
N = len(series)
next_log2 = math.ceil(math.log(N, 2))
M = int(2**next_log2 - N)
indices = [series.index[-1] + PD.Timedelta(seconds=x) for x in range(1, M + 1)]
zero_series = PD.Series(data=NP.zeros(M),
index=indices)
return PD.concat([series, zero_series])
def parse_time_period(cls, period):
""" try to parse specified time period
:param period: specified period
"""
# catch single value
if not isinstance(period, dict):
period = dict(value=period)
# try to parse specified period
try:
return pd.Timedelta(**period).delta
except Exception as ex:
cls.log().critical('unable to parse period: %s', str(period))
raise ex
def comp_date(day):
"""Get date/time from day of year"""
import pandas as pd
return pd.Timestamp('1976-01-01') + pd.Timedelta('{:d}D'.format(day - 1))
def comp_date(day):
"""Get date/time from day of year"""
import pandas as pd
return pd.Timestamp('1976-01-01') + pd.Timedelta('{:d}D'.format(day - 1))
def get_holiday_df(day):
import datetime
holiday_df = pd.DataFrame.from_csv(HOLIDAY_PATH)
index_t = holiday_df.init_date.apply(lambda x: datetime.datetime.strptime(x[:10], '%Y/%m/%d'))
holiday_df.pop('init_date')
holiday_df = holiday_df.set_index(index_t)
holiday_df.index += pd.Timedelta('%dD'%(30+(day-1)))
#holiday_df = holiday_df.ix[:,day:30+day]
holiday_df.columns = map(lambda x:'festday#%d'%x,range(-30-(day-1),31-(day-1)+5))
return holiday_df
def get_festday_df(day):
import datetime
holiday_df = pd.DataFrame.from_csv(FEST_PATH)
index_t = holiday_df.init_date.apply(lambda x: datetime.datetime.strptime(x[:10], '%Y/%m/%d'))
holiday_df.pop('init_date')
holiday_df = holiday_df.set_index(index_t)
holiday_df.index += pd.Timedelta('%dD'%(30+(day-1)))
#holiday_df = holiday_df.ix[:,day:30+day]
holiday_df.columns = map(lambda x:'holiday#%d'%x,range(-30-(day-1),31-(day-1)+5))
return holiday_df
def get_prophet_df(user_id):
prophet_df = pd.DataFrame.from_csv(PROPHET_PATH+'%d.csv'%user_id)
prophet_df.index = pd.to_datetime(prophet_df.ds)
prophet_df = prophet_df[get_prophet_columns()]
#predict 31 days
new_df = pd.DataFrame(index = prophet_df.index[31:-3])
for col in prophet_df.columns:
t_col = prophet_df[col].copy()
t_col.index += pd.Timedelta('3D')
#feature 3 days
#predict 33 days
for day in range(-3,31+3):
new_df[col+'#%d'%day] = t_col
t_col.index -= pd.Timedelta('1D')
return new_df.dropna()
def get_weather_df():
weather_df = pd.DataFrame.from_csv(WEATHER_PATH)
weather_df = weather_df[get_weather_columns()]
#predict 30 days
new_df = pd.DataFrame(index = weather_df.index[30:-88-3])
for col in weather_df.columns:
t_col = weather_df[col].copy()
t_col.index += pd.Timedelta('3D')
#feature 7 days
#predict 30 days
for day in range(-30,31+3):
new_df[col+'#%d'%day] = t_col
t_col.index -= pd.Timedelta('1D')
return new_df.dropna()
def plotWeekly(dictframe, ax, uncertainty, weeklyStart, color='#0072B2'):
if ax is None:
figW = plt.figure(facecolor='w', figsize=(10, 6))
ax = figW.add_subplot(111)
else:
figW = ax.get_figure()
##
# Create a list of 7 days for the x axis of the plot
##
days = (pd.date_range(start='2017-01-01', periods=7) +
pd.Timedelta(days=weeklyStart))
##
# Find the weekday seasonality values for each weekday
##
weekdays = dictframe.ds.dt.weekday
ind = []
for weekday in range(7):
ind.append(max(weekdays[weekdays == weekday].index.tolist()))
##
# Plot only one weekday each
##
ax.plot(range(len(days)), dictframe['weekly'][ind], ls='-', c=color)
##
# Plot uncertainty if necessary
##
if uncertainty:
ax.fill_between(range(len(days)),dictframe['weekly_lower'][ind], dictframe['weekly_upper'][ind],color=color, alpha=0.2)
ax.grid(True, which='major', c='gray', ls='-', lw=1, alpha=0.2)
ax.set_xticks(range(len(days)))
ax.set_xticklabels(dictframe['ds'][ind].dt.weekday_name)
ax.set_xlabel('Day of week')
ax.set_ylabel('weekly')
figW.tight_layout()
return figW
def _get_min_acceptable_period(self):
return pd.Timedelta('1 days')
def _get_min_acceptable_period(self):
return pd.Timedelta('1 hours')
def date_range(schedule, frequency, closed='right', force_close=True, **kwargs):
"""
Given a schedule will return a DatetimeIndex will all of the valid datetime at the frequency given.
The schedule values are assumed to be in UTC.
:param schedule: schedule DataFrame
:param frequency: frequency in standard string
:param closed: same meaning as pandas date_range. 'right' will exclude the first value and should be used when the
results should only include the close for each bar.
:param force_close: if True then the close of the day will be included even if it does not fall on an even
frequency. If False then the market close for the day may not be included in the results
:param kwargs: arguments that will be passed to the pandas date_time
:return: DatetimeIndex
"""
if pd.Timedelta(frequency) > pd.Timedelta('1D'):
raise ValueError('Frequency must be 1D or higher frequency.')
kwargs['closed'] = closed
ranges = list()
for row in schedule.itertuples():
dates = pd.date_range(row.market_open, row.market_close, freq=frequency, tz='UTC', **kwargs)
if force_close:
if row.market_close not in dates:
dates = dates.insert(len(dates), row.market_close)
ranges.append(dates)
index = pd.DatetimeIndex([], tz='UTC')
return index.union_many(ranges)
def days_at_time(days, t, tz, day_offset=0):
"""
Create an index of days at time ``t``, interpreted in timezone ``tz``. The returned index is localized to UTC.
In the example below, the times switch from 13:45 to 12:45 UTC because
March 13th is the daylight savings transition for US/Eastern. All the
times are still 8:45 when interpreted in US/Eastern.
>>> import pandas as pd; import datetime; import pprint
>>> dts = pd.date_range('2016-03-12', '2016-03-14')
>>> dts_at_845 = days_at_time(dts, datetime.time(8, 45), 'US/Eastern')
>>> pprint.pprint([str(dt) for dt in dts_at_845])
['2016-03-12 13:45:00+00:00',
'2016-03-13 12:45:00+00:00',
'2016-03-14 12:45:00+00:00']
:param days: DatetimeIndex An index of dates (represented as midnight).
:param t: datetime.time The time to apply as an offset to each day in ``days``.
:param tz: pytz.timezone The timezone to use to interpret ``t``.
:param day_offset: int The number of days we want to offset @days by
:return: DatetimeIndex of date with the time t
"""
if len(days) == 0:
return pd.DatetimeIndex(days).tz_localize(tz).tz_convert('UTC')
# Offset days without tz to avoid timezone issues.
days = DatetimeIndex(days).tz_localize(None)
delta = pd.Timedelta(
days=day_offset,
hours=t.hour,
minutes=t.minute,
seconds=t.second,
)
return (days + delta).tz_localize(tz).tz_convert('UTC')