def force_start_end_data_to_dataframe(user, dataframe, start_date, end_date):
assert type(dataframe) == pd.DataFrame
# if dataframe contains any dates outside of start and end date ... exclude
dataframe = dataframe[start_date:end_date].asfreq('D')
index = pd.date_range(start=start_date, end=end_date, tz=user.pytz_timezone)
# blank dataframe that we know for certain holds all the right dates
dataframe_container = pd.DataFrame(index=index)
# join the dataframe with an empty one that has all the right indices ... to return a dataframe with all the right
# start and end dates
normalized_dataframe = pd.DataFrame.join(dataframe_container, dataframe)
# Pandas is like a fine edged sword, sometimes it cuts everything perfectly, other times you don't know it's
# power and it claws at you and takes back the bamboo. For the record, problem is not the panda, but the trainer.
assert dataframe_container.index.size == normalized_dataframe.index.size
return normalized_dataframe
python类date_range()的实例源码
def _get_serialized_dataframe(self, supplement_name, boolean_string_name, values_to_create):
data_values = [boolean_string_name] * values_to_create
today = datetime.date.today()
periods_ago = today - datetime.timedelta(days=values_to_create - 1)
date_range = pd.date_range(periods_ago, today)
# this would be stupid if the count is off
self.assertEqual(len(data_values), len(date_range))
dataframe = pd.DataFrame(index=date_range)
dataframe[supplement_name] = data_values
# make sure there's no dynamic type conversion that can screw you
series = dataframe[supplement_name]
self.assertEqual(series[0], boolean_string_name)
serialized_dataframe = ExcelSupplementFileSerializer._sanitize_dataframe_values(dataframe)
return serialized_dataframe
def import_history(self, start_date, end_date):
dataframe_columns = RESCUETIME_EFFICIENCY_HEADERS + [PRODUCTIVITY_PULSE]
historical_df = pd.DataFrame(columns=dataframe_columns)
query_dates = pd.date_range(start=start_date, end=end_date).date
for query_date in query_dates:
response = self._get_rescuetime_efficiency_for_date(query_date)
if response.status_code != 200:
continue
efficiency_timeseries = self.get_efficiency_timeseries_from_response(response)
pulse = calculate_rescue_time_pulse_from_dataframe(efficiency_timeseries)
efficiency_timeseries[PRODUCTIVITY_PULSE] = pulse
# Update the dataframe with history
historical_df.loc[query_date] = efficiency_timeseries
# when done, update into the results
self.results = historical_df
def __init__(self, user, periods_back=30):
self.user = user
self.hour_series = range(0, 24)
historical_data_points_quantity = periods_back
end_date = timezone.now()
# use pandas to generate a nifty index of timestamps, use timezone to remove warning signals
self.date_series = pd.date_range(end=end_date, freq='D', periods=historical_data_points_quantity)
# build a series that shows the impact of what supplements/events have on sleep
self.sleep_impact_series = pd.Series(0, index=self.date_series)
self.productivity_impact_series = pd.Series(0, index=self.date_series)
self.sleep_series = self._get_random_sleep_series(self.date_series)
# Create a cache here because creating many events is very slow on Production ...
# so create a cache of commonly used Django objects and then create a bunch of events that
# need this foreign key, so we can use bulk_create
self.user_activities = {}
self.supplements = {}
def create_timeseries(starting_date, ending_date, value=0):
"""Create a Pandas Time Series with constant values.
Attributes
----------
starting_date: str, pandas.tslib.Timestamp
The first date of the Time Series.
ending_date: str, pandas.tslib.Timestamp
The last date of the Time Series.
value: int,float
Value to add to new entries. Default is zero.
"""
timeseries_index = pd.date_range(starting_date, ending_date)
timeseries = pd.Series(value, index=timeseries_index)
return timeseries
def create_es(solver, timesteps, year):
"""
Creates a default energy system to load results into.
"""
simulation = es.Simulation(solver=solver,
timesteps=timesteps,
debug=False,
objective_options={"function": minimize_cost})
# Adding a time index to the energy system
time_index = pd.date_range('1/1/' + year,
periods=len(timesteps),
freq='H')
energysystem = es.EnergySystem(time_idx=time_index,
simulation=simulation)
return energysystem
def _hourly_range(self, init_date, time_frame):
"""
Returns DatetimeIndex trading week/s in hours.
"""
utcnow = datetime.utcnow()
tr_wk_str, tr_wk_end = self.get_trading_week(init_date)
if tr_wk_end > utcnow:
tr_wk_end = utcnow.replace(
minute=00,second=00, microsecond=00)
freq, interval_type, delta = self._data_frequency(time_frame)
dth = pd.date_range(str(tr_wk_str), str(tr_wk_end), freq=freq)
while (len(dth) % (300*int(time_frame[1:])) == 0) == False:
tr_wk_str = tr_wk_end + timedelta(**{interval_type: delta})
if tr_wk_str < utcnow:
tr_wk_str, tr_wk_end = self.get_trading_week(tr_wk_str)
if tr_wk_end > utcnow:
tr_wk_end = utcnow.replace(
minute=00,second=00, microsecond=00)
tr_wk_end += timedelta(hours=1)
dth = dth.append(
pd.date_range(str(tr_wk_str), str(tr_wk_end), freq=freq))
else:
break
return dth
def _daily_range(self, daily):
"""
Returns DatetimeIndex for daily values.
"""
max_bars = 299
utcnow = datetime.utcnow()
dtd = pd.DatetimeIndex([])
while daily < utcnow:
tr_wk_str, tr_wk_end = self.get_trading_week(daily)
hour = int(str(tr_wk_str.time())[:2])
daily += timedelta(days=1)
daily = daily.replace(hour=hour)
if daily >= tr_wk_end:
daily, tr_wk_end = self.get_trading_week(daily)
dtd = dtd.append(
pd.date_range(str(daily), str(daily)))
return dtd
def _monthly_range(self, last_day_of_month):
"""
Returns DatetimeIndex for monthly values.
"""
ldom = last_day_of_month
max_bars = 299
utcnow = datetime.utcnow()
dtm = pd.DatetimeIndex([])
while ldom < utcnow:
dtm = dtm.append(pd.date_range(
str(ldom), str(ldom)))
if ldom.month == 12:
ldom = ldom.replace(year=ldom.year+1, month=2, day=1)
elif ldom.month == 11:
ldom = ldom.replace(year=ldom.year+1, month=1, day=1)
else:
ldom = ldom.replace(month=ldom.month+2, day=1)
ldom -= timedelta(days=1)
ldom = ldom.replace(hour=self.new_york_offset(ldom, 22))
return dtm
def fill_in_missing_dates(df, date_col_name, other_col):
startd = df[date_col_name].values[0]
endd = df[date_col_name].values[-1]
print startd, endd
idx = pd.date_range(startd, endd)
dict = {}
for index, row in df.iterrows():
dict[row[date_col_name]] = row[other_col]
new_data = []
for d in idx:
pydate = d.to_pydatetime()
daskey = pydate.strftime('%Y-%m-%d')
new_data.append([daskey, dict[daskey] if dict.has_key(daskey) else None])
return np.row_stack(new_data)
def fill_in_missing_dates(df, date_col_name, other_col):
startd = df[date_col_name].values[0]
endd = df[date_col_name].values[-1]
print startd, endd
idx = pd.date_range(startd, endd)
dict = {}
for index, row in df.iterrows():
dict[row[date_col_name]] = row[other_col]
new_data = []
for d in idx:
pydate = d.to_pydatetime()
daskey = pydate.strftime('%Y-%m-%d')
new_data.append([daskey, dict[daskey] if dict.has_key(daskey) else 0])
return np.row_stack(new_data)
test_util.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def test_daily(self):
rng = date_range('1/1/2000', '12/31/2004', freq='D')
ts = Series(np.random.randn(len(rng)), index=rng)
annual = pivot_annual(ts, 'D')
doy = ts.index.dayofyear
doy[(~isleapyear(ts.index.year)) & (doy >= 60)] += 1
for i in range(1, 367):
subset = ts[doy == i]
subset.index = [x.year for x in subset.index]
result = annual[i].dropna()
tm.assert_series_equal(result, subset, check_names=False)
self.assertEqual(result.name, i)
# check leap days
leaps = ts[(ts.index.month == 2) & (ts.index.day == 29)]
day = leaps.index.dayofyear[0]
leaps.index = leaps.index.year
leaps.name = 60
tm.assert_series_equal(annual[day].dropna(), leaps)
def market_minutes_for_day(self, stamp):
market_open, market_close = self.get_open_and_close(stamp)
return pd.date_range(market_open, market_close, freq='T')
def get_trading_days(start, end, trading_day=trading_day):
return pd.date_range(start=start.date(),
end=end.date(),
freq=trading_day).tz_localize('UTC')
def get_trading_days(start, end, trading_day=trading_day):
return pd.date_range(start=start.date(),
end=end.date(),
freq=trading_day).tz_localize('UTC')
def get_trading_days(start, end, trading_day=trading_day):
return pd.date_range(start=start.date(),
end=end.date(),
freq=trading_day).tz_localize('UTC')
def get_trading_days(start, end, trading_day=trading_day):
return pd.date_range(start=start.date(),
end=end.date(),
freq=trading_day).tz_localize('UTC')
def gen_calendars(start, stop, critical_dates):
"""
Generate calendars to use as inputs.
"""
all_dates = pd.date_range(start, stop, tz='utc')
for to_drop in map(list, powerset(critical_dates)):
# Have to yield tuples.
yield (all_dates.drop(to_drop),)
# Also test with the trading calendar.
yield (trading_days[trading_days.slice_indexer(start, stop)],)
def test_basics(self, window=10):
items = ['bar', 'baz', 'foo']
minor = ['A', 'B', 'C', 'D']
rp = MutableIndexRollingPanel(window, items, minor, cap_multiple=2)
dates = pd.date_range('2000-01-01', periods=30, tz='utc')
major_deque = deque(maxlen=window)
frames = {}
for i, date in enumerate(dates):
frame = pd.DataFrame(np.random.randn(3, 4), index=items,
columns=minor)
rp.add_frame(date, frame)
frames[date] = frame
major_deque.append(date)
result = rp.get_current()
expected = pd.Panel(frames, items=list(major_deque),
major_axis=items, minor_axis=minor)
tm.assert_panel_equal(result, expected.swapaxes(0, 1))
def setUpClass(cls):
cls.dates = dates = pd.date_range('2014-01-01', '2014-01-03')
dates = cls.dates.repeat(3)
cls.sids = sids = ord('A'), ord('B'), ord('C')
cls.df = df = pd.DataFrame({
'sid': sids * 3,
'value': (0., 1., 2., 1., 2., 3., 2., 3., 4.),
'int_value': (0, 1, 2, 1, 2, 3, 2, 3, 4),
'asof_date': dates,
'timestamp': dates,
})
cls.dshape = dshape("""
var * {
sid: ?int64,
value: ?float64,
int_value: ?int64,
asof_date: datetime,
timestamp: datetime
}
""")
cls.macro_df = df[df.sid == 65].drop('sid', axis=1)
dshape_ = OrderedDict(cls.dshape.measure.fields)
del dshape_['sid']
cls.macro_dshape = var * Record(dshape_)
cls.garbage_loader = BlazeLoader()
cls.missing_values = {'int_value': 0}