def quiet_days(series,
n_days=5,
period='min'):
"""
Given a pandas time series *series*, return a mapping between
months and the *n_days* quietest days for that month. See (4) of
Love and Gannon. The parameter *period* gives the sampling period
(`'min'` = 1 measurement per minute).
"""
quiet_day_map = {}
delta_H_i = series.diff().abs().\
groupby(PD.TimeGrouper(freq='D')).\
filter(lambda x: x.isnull().sum() <= int(0.5 * SAMPLES[period])).\
groupby(PD.TimeGrouper(freq='D')).mean()
for month, delta_H_i_month in delta_H_i.groupby(PD.TimeGrouper(freq='M')):
quiet_day_map[month] = delta_H_i_month.nsmallest(n_days).sort_index().index
return quiet_day_map
python类TimeGrouper()的实例源码
def chebyshev_fit(series,
quiet_day_map,
deg=10):
"""
???
"""
grouped_by_day = series.groupby(PD.TimeGrouper(freq='D'))
x = []
y = []
for month_end in sorted(quiet_day_map):
for quiet_day in quiet_day_map[month_end]:
quiet_day_H = grouped_by_day.get_group(quiet_day)
quiet_day_H = quiet_day_H[quiet_day_H.notnull()]
x.extend([toJ2000(time_stamp.to_datetime()) for time_stamp in quiet_day_H.index])
y.extend(quiet_day_H.values)
return NP.polynomial.chebyshev.chebfit(x, y, deg, full=True)
def test_dates(self):
for t in self.mock_tweets:
self.feels_db.insert_tweet(t)
self.assertEqual(len(self.feels_db.tweet_dates), 3)
tweets = []
with open(self.tweets_data_path) as tweets_file:
lines = filter(None, (line.rstrip() for line in tweets_file))
for line in lines:
try:
tweets.append(Tweet(json.loads(line)))
except KeyError:
pass
for t in tweets:
self.feels_db.insert_tweet(t)
self.assertEqual(len(self.feels_db.tweet_dates), 105)
df = self.feels_db.tweet_dates
timebox = timedelta(seconds=60)
second = timedelta(seconds=1)
df = df.groupby(pd.TimeGrouper(freq=f'{int(timebox/second)}S')).size()
df = df[df != 0]
print(df)
self.assertEqual(len(df), 3)
self.assertEqual(df.iloc[0], 103)
test_resample.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 37
收藏 0
点赞 0
评论 0
def test_resample_basic(self):
rng = date_range('1/1/2000 00:00:00', '1/1/2000 00:13:00', freq='min',
name='index')
s = Series(np.random.randn(14), index=rng)
result = s.resample('5min', closed='right', label='right').mean()
exp_idx = date_range('1/1/2000', periods=4, freq='5min', name='index')
expected = Series([s[0], s[1:6].mean(), s[6:11].mean(), s[11:].mean()],
index=exp_idx)
assert_series_equal(result, expected)
self.assertEqual(result.index.name, 'index')
result = s.resample('5min', closed='left', label='right').mean()
exp_idx = date_range('1/1/2000 00:05', periods=3, freq='5min',
name='index')
expected = Series([s[:5].mean(), s[5:10].mean(),
s[10:].mean()], index=exp_idx)
assert_series_equal(result, expected)
s = self.series
result = s.resample('5Min').last()
grouper = TimeGrouper(Minute(5), closed='left', label='left')
expect = s.groupby(grouper).agg(lambda x: x[-1])
assert_series_equal(result, expect)
test_resample.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_resample_frame_basic(self):
df = tm.makeTimeDataFrame()
b = TimeGrouper('M')
g = df.groupby(b)
# check all cython functions work
funcs = ['add', 'mean', 'prod', 'min', 'max', 'var']
for f in funcs:
g._cython_agg_general(f)
result = df.resample('A').mean()
assert_series_equal(result['A'], df['A'].resample('A').mean())
result = df.resample('M').mean()
assert_series_equal(result['A'], df['A'].resample('M').mean())
df.resample('M', kind='period').mean()
df.resample('W-WED', kind='period').mean()
test_resample.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_resample_ohlc(self):
s = self.series
grouper = TimeGrouper(Minute(5))
expect = s.groupby(grouper).agg(lambda x: x[-1])
result = s.resample('5Min').ohlc()
self.assertEqual(len(result), len(expect))
self.assertEqual(len(result.columns), 4)
xs = result.iloc[-2]
self.assertEqual(xs['open'], s[-6])
self.assertEqual(xs['high'], s[-6:-1].max())
self.assertEqual(xs['low'], s[-6:-1].min())
self.assertEqual(xs['close'], s[-2])
xs = result.iloc[0]
self.assertEqual(xs['open'], s[0])
self.assertEqual(xs['high'], s[:5].max())
self.assertEqual(xs['low'], s[:5].min())
self.assertEqual(xs['close'], s[4])
test_resample.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def test_apply_iteration(self):
# #2300
N = 1000
ind = pd.date_range(start="2000-01-01", freq="D", periods=N)
df = DataFrame({'open': 1, 'close': 2}, index=ind)
tg = TimeGrouper('M')
_, grouper, _ = tg._get_grouper(df)
# Errors
grouped = df.groupby(grouper, group_keys=False)
f = lambda df: df['close'] / df['open']
# it works!
result = grouped.apply(f)
self.assertTrue(result.index.equals(df.index))
test_resample.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_panel_aggregation(self):
ind = pd.date_range('1/1/2000', periods=100)
data = np.random.randn(2, len(ind), 4)
wp = pd.Panel(data, items=['Item1', 'Item2'], major_axis=ind,
minor_axis=['A', 'B', 'C', 'D'])
tg = TimeGrouper('M', axis=1)
_, grouper, _ = tg._get_grouper(wp)
bingrouped = wp.groupby(grouper)
binagg = bingrouped.mean()
def f(x):
assert (isinstance(x, Panel))
return x.mean(1)
result = bingrouped.agg(f)
tm.assert_panel_equal(result, binagg)
test_resample.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_fails_on_no_datetime_index(self):
index_names = ('Int64Index', 'Index', 'Float64Index', 'MultiIndex')
index_funcs = (tm.makeIntIndex,
tm.makeUnicodeIndex, tm.makeFloatIndex,
lambda m: tm.makeCustomIndex(m, 2))
n = 2
for name, func in zip(index_names, index_funcs):
index = func(n)
df = DataFrame({'a': np.random.randn(n)}, index=index)
with tm.assertRaisesRegexp(TypeError,
"Only valid with DatetimeIndex, "
"TimedeltaIndex or PeriodIndex, "
"but got an instance of %r" % name):
df.groupby(TimeGrouper('D'))
# PeriodIndex gives a specific error message
df = DataFrame({'a': np.random.randn(n)}, index=tm.makePeriodIndex(n))
with tm.assertRaisesRegexp(TypeError,
"axis must be a DatetimeIndex, but "
"got an instance of 'PeriodIndex'"):
df.groupby(TimeGrouper('D'))
def get_ts_data():
requested_facts = text_input.value.split(",")[:8]
requested_facts = [f.strip() for f in requested_facts]
req_df = preprocessing.get_facts_from_list(df, requested_facts)
ts = preprocessing.make_timeseries(req_df)
ts.columns = ts.columns.droplevel(0)
ts = ts.groupby(pd.TimeGrouper(freq=timegroupoptionsmapper[timegroup.active])).sum()
return ts
def get_daily_gain(data, timezone=pytz.timezone('US/Pacific')):
"""Obtain the daily gain.
Attributes
----------
data: list
The list of dictionaries where each dictionary gives information on one
completed submission. It is the output of the `get_data` function
timezone: pytz.timezone
A valid pytz timezone. Default is the Pacific Standard Time, which is
the one used by Udacity
Returns
-------
A Pandas Series where the indices are the days and the values are the
total gain in USD of that day. The time zone is the Pacific Standard
Time.
"""
date_price_data = np.array([(d['completed_at'], d['price']) for d in data])
price_series = pd.Series(date_price_data[:, 1].astype(float),
index=pd.to_datetime(date_price_data[:, 0]))
price_series = price_series.sort_index()
# Convert timezone
utc = pytz.utc
price_series = price_series.tz_localize(utc).tz_convert(timezone)
# Calculate the gain by day
daily_gain = price_series.groupby(pd.TimeGrouper('D')).sum()
return daily_gain
test_resample.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def test_custom_grouper(self):
dti = DatetimeIndex(freq='Min', start=datetime(2005, 1, 1),
end=datetime(2005, 1, 10))
s = Series(np.array([1] * len(dti)), index=dti, dtype='int64')
b = TimeGrouper(Minute(5))
g = s.groupby(b)
# check all cython functions work
funcs = ['add', 'mean', 'prod', 'ohlc', 'min', 'max', 'var']
for f in funcs:
g._cython_agg_general(f)
b = TimeGrouper(Minute(5), closed='right', label='right')
g = s.groupby(b)
# check all cython functions work
funcs = ['add', 'mean', 'prod', 'ohlc', 'min', 'max', 'var']
for f in funcs:
g._cython_agg_general(f)
self.assertEqual(g.ngroups, 2593)
self.assertTrue(notnull(g.mean()).all())
# construct expected val
arr = [1] + [5] * 2592
idx = dti[0:-1:5]
idx = idx.append(dti[-1:])
expect = Series(arr, index=idx)
# GH2763 - return in put dtype if we can
result = g.agg(np.sum)
assert_series_equal(result, expect)
df = DataFrame(np.random.rand(len(dti), 10),
index=dti, dtype='float64')
r = df.groupby(b).agg(np.sum)
self.assertEqual(len(r.columns), 10)
self.assertEqual(len(r.index), 2593)
test_resample.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_resample_nunique(self):
# GH 12352
df = DataFrame({
'ID': {pd.Timestamp('2015-06-05 00:00:00'): '0010100903',
pd.Timestamp('2015-06-08 00:00:00'): '0010150847'},
'DATE': {pd.Timestamp('2015-06-05 00:00:00'): '2015-06-05',
pd.Timestamp('2015-06-08 00:00:00'): '2015-06-08'}})
r = df.resample('D')
g = df.groupby(pd.Grouper(freq='D'))
expected = df.groupby(pd.TimeGrouper('D')).ID.apply(lambda x:
x.nunique())
self.assertEqual(expected.name, 'ID')
for t in [r, g]:
result = r.ID.nunique()
assert_series_equal(result, expected)
# TODO
# this should have name
# https://github.com/pydata/pandas/issues/12363
expected.name = None
result = df.ID.resample('D').nunique()
assert_series_equal(result, expected)
result = df.ID.groupby(pd.Grouper(freq='D')).nunique()
assert_series_equal(result, expected)
test_resample.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_count(self):
self.ts[::3] = np.nan
expected = self.ts.groupby(lambda x: x.year).count()
grouper = TimeGrouper('A', label='right', closed='right')
result = self.ts.groupby(grouper).count()
expected.index = result.index
assert_series_equal(result, expected)
result = self.ts.resample('A').count()
expected.index = result.index
assert_series_equal(result, expected)
test_groupby.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_groupby_with_empty(self):
index = pd.DatetimeIndex(())
data = ()
series = pd.Series(data, index)
grouper = pd.tseries.resample.TimeGrouper('D')
grouped = series.groupby(grouper)
assert next(iter(grouped), None) is None
test_groupby.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_groupby_with_timegrouper(self):
# GH 4161
# TimeGrouper requires a sorted index
# also verifies that the resultant index has the correct name
import datetime as DT
df_original = DataFrame({
'Buyer': 'Carl Carl Carl Carl Joe Carl'.split(),
'Quantity': [18, 3, 5, 1, 9, 3],
'Date': [
DT.datetime(2013, 9, 1, 13, 0),
DT.datetime(2013, 9, 1, 13, 5),
DT.datetime(2013, 10, 1, 20, 0),
DT.datetime(2013, 10, 3, 10, 0),
DT.datetime(2013, 12, 2, 12, 0),
DT.datetime(2013, 9, 2, 14, 0),
]
})
# GH 6908 change target column's order
df_reordered = df_original.sort_values(by='Quantity')
for df in [df_original, df_reordered]:
df = df.set_index(['Date'])
expected = DataFrame(
{'Quantity': np.nan},
index=date_range('20130901 13:00:00',
'20131205 13:00:00', freq='5D',
name='Date', closed='left'))
expected.iloc[[0, 6, 18], 0] = np.array(
[24., 6., 9.], dtype='float64')
result1 = df.resample('5D') .sum()
assert_frame_equal(result1, expected)
df_sorted = df.sort_index()
result2 = df_sorted.groupby(pd.TimeGrouper(freq='5D')).sum()
assert_frame_equal(result2, expected)
result3 = df.groupby(pd.TimeGrouper(freq='5D')).sum()
assert_frame_equal(result3, expected)
test_groupby.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_groupby_with_timegrouper_methods(self):
# GH 3881
# make sure API of timegrouper conforms
import datetime as DT
df_original = pd.DataFrame({
'Branch': 'A A A A A B'.split(),
'Buyer': 'Carl Mark Carl Joe Joe Carl'.split(),
'Quantity': [1, 3, 5, 8, 9, 3],
'Date': [
DT.datetime(2013, 1, 1, 13, 0),
DT.datetime(2013, 1, 1, 13, 5),
DT.datetime(2013, 10, 1, 20, 0),
DT.datetime(2013, 10, 2, 10, 0),
DT.datetime(2013, 12, 2, 12, 0),
DT.datetime(2013, 12, 2, 14, 0),
]
})
df_sorted = df_original.sort_values(by='Quantity', ascending=False)
for df in [df_original, df_sorted]:
df = df.set_index('Date', drop=False)
g = df.groupby(pd.TimeGrouper('6M'))
self.assertTrue(g.group_keys)
self.assertTrue(isinstance(g.grouper, pd.core.groupby.BinGrouper))
groups = g.groups
self.assertTrue(isinstance(groups, dict))
self.assertTrue(len(groups) == 3)
def backtest(self, data_frame):
dfs = data_frame.groupby(pd.TimeGrouper(freq='D'))
# only choose trading days
dfs = [(d, df) for (d, df) in dfs if df.shape[0]]
if sys.version_info[0] < 3:
exit_dfs = [self._compute_daily_performance(daily_data)
for daily_data in dfs]
else:
exit_dfs = Parallel(n_jobs=-1, verbose=50)(
delayed(self._compute_daily_performance)(daily_data)
for daily_data in dfs)
return pd.concat(exit_dfs)
def timeGroup (frame, step, file_csv):
dfx = frame.set_index(['time'])
dfx = dfx[dfx.status == 'accepted']
grouper = dfx.groupby([pd.TimeGrouper(step), 'Miner'])
dfTimeS = grouper['Miner'].count().unstack('Miner').fillna(0)
dfTimeS = dfTimeS[:-1] #There are arguments for and against this. The final group is not 'complete', so we will not include it.
dfTimeS.to_csv(file_csv, encoding='utf-8')
return dfTimeS
#Summary tables
def calculate_latest_coeffs(self):
unit_topic_tmpl = "{campus}/{building}/{unit}/{point}"
unit_points = [self.power_name]
df = None
#Get data
unit = self.temp_unit
for point in unit_points:
if point == self.power_name:
unit = self.power_unit
unit_topic = unit_topic_tmpl.format(campus=self.site,
building=self.building,
unit=unit,
point=point)
result = self.vip.rpc.call('platform.historian',
'query',
topic=unit_topic,
count=self.no_of_recs_needed,
order="LAST_TO_FIRST").get(timeout=10000)
df2 = pd.DataFrame(result['values'], columns=[self.ts_name, point])
df2[self.ts_name] = pd.to_datetime(df2[self.ts_name])
df2 = df2.groupby([pd.TimeGrouper(key=self.ts_name, freq=self.aggregate_freq)]).mean()
# df2[self.ts_name] = df2[self.ts_name].apply(lambda dt: dt.replace(second=0, microsecond=0))
df = df2 if df is None else pd.merge(df, df2, how='outer', left_index=True, right_index=True)
#Calculate coefficients
result_df = self.calculate_coeffs(df)
# Publish coeffs to store
#if coeffs is not None:
# self.save_coeffs(coeffs, subdevice)
def calculate_latest_coeffs(self):
unit_topic_tmpl = "{campus}/{building}/{unit}/{point}"
unit_points = [self.fan_power_name, self.static_pressure_name, self.air_flow_rate_name]
df = None
for point in unit_points:
unit_topic = unit_topic_tmpl.format(campus=self.site,
building=self.building,
unit=self.unit,
point=point)
result = self.vip.rpc.call('platform.historian',
'query',
topic=unit_topic,
count=self.no_of_recs_needed,
order="LAST_TO_FIRST").get(timeout=1000)
df2 = pd.DataFrame(result['values'], columns=[self.ts_name, point])
self.convert_units_to_SI(df2, point, result['metadata']['units'])
df2[self.ts_name] = pd.to_datetime(df2[self.ts_name])
df2 = df2.groupby([pd.TimeGrouper(key=self.ts_name, freq=self.aggregate_freq)]).mean()
#df2[self.ts_name] = df2[self.ts_name].apply(lambda dt: dt.replace(second=0, microsecond=0))
df = df2 if df is None else pd.merge(df, df2, how='outer', left_index=True, right_index=True)
#print(df)
coeffs = self.calculate_coeffs(df)
# Publish coeffs to store
if coeffs is not None:
self.save_coeffs(coeffs)
def Calculate_Liquidity_Coeff(df_list):
#list of lists of liq coeff per bond
liq_arr_list = []
#print 'df_list size: ', len(df_list)
for df in df_list:
if df.empty:
continue
# A temporary array for holding liquidity beta for each month
liq_arr = [np.nan] * num_months_CONST
#print df['cusip_id'][0]
# Group dataframe on index by month
for date, df_group in df.groupby(pd.TimeGrouper("M")):
month = ''.join([str(date.month),str(date.year)])
month_key = month_keys[month]
# When there are some data in current month,
if df_group.shape[0] > 0:
# Run regression (as equation (2)) to get liquidity measure
y,X = dmatrices('excess_return_1 ~ yld_pt + volume_and_sign',
data=df_group, return_type='dataframe')
#print date, X.shape
mod = sm.OLS(y,X)
res = mod.fit()
#set specific months with liquidity factors
#res.params(2) = liquidity coefficient
liq_arr[month_key] = res.params[2]
liq_arr_list.append(liq_arr) #store all liq coeff for each month per bond
return liq_arr_list
def wip_chart(cfd_data, frequency="1W-MON", start_column=None, end_column=None, title=None, ax=None):
if len(cfd_data.index) == 0:
raise UnchartableData("Cannot draw WIP chart with no data")
if start_column is None:
start_column = cfd_data.columns[1]
if end_column is None:
end_column = cfd_data.columns[-1]
if ax is None:
fig, ax = plt.subplots()
if title is not None:
ax.set_title(title)
wip_data = pd.DataFrame({'wip': cfd_data[start_column] - cfd_data[end_column]})
groups = wip_data[['wip']].groupby(pd.TimeGrouper(frequency, label='left'))
labels = [x[0].strftime("%d/%m/%Y") for x in groups]
groups.boxplot(subplots=False, ax=ax, showmeans=True, return_type='axes')
ax.set_xticklabels(labels, rotation=70, size='small')
ax.set_xlabel("Week")
ax.set_ylabel("WIP")
return ax
def get_savings(data, column_debit='Debit', column_credit='Credit', dt_start=None, dt_end=None, aggregation_period='M'):
""" Consumes the checking account data and returns the monthly savings rate.
Args:
data (dataframe): The panadas dataframe containing at least a debit and a credit column.
column_debit (str): The column name for the debit column.
column_credit (str): The column name for the credit column.
dt_start (str): The start date (specific if given '2012-11-11' or the month '2012-11')
from were the savings should be calculated.
dt_end (str): The end date (specific if given '2012-11-11' or the month '2012-11')
to were the savings should be calculated.
aggregation_period (str): Single string character like 'M' for month specifying, over which period the savings
are aggregated. A full specification can be found here:
http://pandas.pydata.org/pandas-docs/stable/timeseries.html#timeseries-offset-aliases
Returns:
A pandas data frame, with an additional 'Savings' column and the time difference between start and end
represented with a single row for each aggregation interval that is not null.
"""
if not isinstance(data.index, pd.DatetimeIndex):
logging.getLogger().error("A pandas datetimeindex is required for the given dataframe")
return pd.DataFrame()
# create a copy of the indexed original data frame
aggregated = data[dt_start:dt_end][[column_debit, column_credit]].copy()
aggregated = aggregated.groupby(pd.TimeGrouper(aggregation_period)).sum()
aggregated = aggregated.fillna(0)
aggregated['Savings'] = aggregated[column_credit] - aggregated[column_debit]
return aggregated
def test_basicStats(self):
filepath = RESOURCE_PATH + "\\unittest\\test_hb_basic01.csv"
data = utils.loadIntradayData(filepath).set_index('datetime')
stats = hbStats.groupByBasicStats(pd.TimeGrouper(freq='d'), data)
self.assertEqual(stats.iloc[0]['count'], 16)
self.assertEqual(stats.iloc[0]['max'], 70)
self.assertEqual(stats.iloc[0]['min'], 50)
self.assertEqual(stats.iloc[0]['mean'], 60)
def plotYearMonthStatsHb(data):
#pd.groupby(b,by=[b.index.month,b.index.year])
data.groupby(pd.TimeGrouper(freq='M')).mean().plot()
sns.plt.show()
def avg_wl(self, numObs=50, avgtype='stdWL', grptype='bytime', grper='12M'):
"""Calculates standardized statistics for a list of stations or a huc from the USGS
:param numObs: minimum observations per site required to include site in analysis; default is 50
:param avgtype: averaging technique for site data; options are 'avgDiffWL' and 'stdWL'; default is 'stWL'
:param grptype: way to group the averaged data; options are 'bytime' or 'monthly' or user input; default 'bytime'
:param grper: only used if 'bytime' called; defaults to '12M'; other times can be put in
:return:
"""
data = self.cleanGWL(self.data)
# stationWL = pd.merge(siteinfo, data, on = 'site_no')
data.reset_index(inplace=True)
data.set_index(['datetime'], inplace=True)
# get averages by year, month, and site number
site_size = data.groupby('site_no').size()
wl_long = data[data['site_no'].isin(list(site_size[site_size >= numObs].index.values))]
siteList = list(wl_long.site_no.unique())
for site in siteList:
mean = wl_long.ix[wl_long.site_no == site, 'value'].mean()
std = wl_long.ix[wl_long.site_no == site, 'value'].std()
wl_long.ix[wl_long.site_no == site, 'avgDiffWL'] = wl_long.ix[wl_long.site_no == site, 'value'] - mean
wl_long.ix[wl_long.site_no == site, 'stdWL'] = wl_long.ix[wl_long.site_no == site, 'avgDiffWL'] / std
if grptype == 'bytime':
grp = pd.TimeGrouper(grper)
elif grptype == 'monthly':
grp = wl_long.index.month
else:
grp = grptype
wl_stats = wl_long.groupby([grp])[avgtype].agg({'mean': np.mean, 'median': np.median,
'standard': np.std,
'cnt': (lambda x: np.count_nonzero(~np.isnan(x))),
'err_pls': (lambda x: np.mean(x) + (np.std(x) * 1.96)),
'err_min': (lambda x: np.mean(x) - (np.std(x) * 1.96))})
return wl_stats
def fetchbin(self, start=None, end=None, binsize=timedelta(seconds=60),
empty=False):
"""
Returns a generator that can be used to iterate over the tweet data
based on ``binsize``.
:param start: Query start date.
:type start: datetime
:param end: Query end date.
:type end: datetime
:param binsize: Time duration for each bin for tweet grouping.
:type binsize: timedelta
:param empty: Determines whether empty dataframes will be yielded.
:type empty: boolean
:returns: A dataframe along with time boundaries for the data.
:rtype: tuple
"""
second = timedelta(seconds=1)
if start is None: start=self.start-second
if end is None: end=self.end
if start == self.start: start = start-second
df = self.tweet_dates
df = df.groupby(pd.TimeGrouper(freq=f'{int(binsize/second)}S')).size()
df = df[df.index > start - binsize]
if not empty: df = df[df != 0]
conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES)
c = conn.cursor()
c.execute(
"SELECT * FROM tweets WHERE created_at > ? AND created_at <= ?",
(start, end)
)
for i in range(0,len(df)):
frame = []
if df.iloc[i] > 0:
frame = pd.DataFrame.from_records(
data=c.fetchmany(df.iloc[i]), columns=self.fields,
index='created_at'
)
left = df.index[i].to_pydatetime()
right = left + binsize
if len(frame)>0 or empty: yield TweetBin(frame, left, right)
c.close()
def update(attrname, old, new):
subset = get_subset(dictchooser.value)
new_absolute_source = subset[0] \
.ix[:, :top_n.value] \
.groupby(pd.TimeGrouper(freq=timegroupoptionsmapper[timegroup.active])) \
.sum().fillna(0)
new_relative_source = subset[1] \
.ix[:, :top_n.value] \
.groupby(pd.TimeGrouper(freq=timegroupoptionsmapper[timegroup.active])) \
.sum().fillna(0)
for old, new in zip(abs_arrangement, new_absolute_source.columns.tolist()):
old.title.text = new
for old, new in zip(rel_arrangement, new_relative_source.columns.tolist()):
old.title.text = new
new_abs_sources = [ColumnDataSource(dict(date=new_absolute_source.index,
y=new_absolute_source[l]))
for l in new_absolute_source.columns.tolist()]
new_rel_sources = [ColumnDataSource(dict(date=new_relative_source.index,
y=new_relative_source[l]))
for l in new_relative_source.columns.tolist()]
for old, new in zip(abs_sources, new_abs_sources):
old.data.update(new.data)
for old, new in zip(rel_sources, new_rel_sources):
old.data.update(new.data)
new_abs_point_sources = [ColumnDataSource(dict(date=[new_absolute_source[l].idxmax()],
y=[new_absolute_source[l].max()],
text=[str(int(new_absolute_source[l].max()))]
)
)
for l in new_absolute_source.columns.tolist()]
new_rel_point_sources = [ColumnDataSource(dict(date=[new_relative_source[l].idxmax()],
y=[new_relative_source[l].max()],
text=[str(int(new_relative_source[l].max()))]
)
)
for l in new_relative_source.columns.tolist()]
for old, new in zip(abs_point_sources, new_abs_point_sources):
old.data.update(new.data)
for old, new in zip(rel_point_sources, new_rel_point_sources):
old.data.update(new.data)
# Create Input controls
def run(tags, time_span, step):
data_sets = [Data(time_span, step, t) for t in tags]
data_frames = []
step_fmt = period_map.get(step)
for ss in data_sets:
if not ss.df.empty:
ss.df['start_time'] = pd.to_datetime(ss.df['start'])
ss.df['end_time'] = pd.to_datetime(ss.df['end'])
ss.df.drop('start', axis=1, inplace=True) # Drop `start` column.
ss.df.drop('end', axis=1, inplace=True) # Drop `end` column.
ss.df.drop('tags', axis=1, inplace=True) # Drop `tags` column.
ss.df['duration'] = (ss.df['end_time'] - ss.df['start_time'])
ss.df['duration'] = (
(ss.df['duration'] / np.timedelta64(1, 's')) / 60
)
ss.df['interval'] = ss.df.end_time.dt.to_period(step_fmt)
ss.df = ss.df.set_index('interval') # `interval` column as index.
ss.df.drop('start_time', axis=1, inplace=True) # Drop `start_time`.
ss.df.drop('end_time', axis=1, inplace=True) # Drop `end_time`.
ss.df = ss.df.groupby(
pd.TimeGrouper(step_fmt),
level=0,
).aggregate(
np.sum
)
ss.df.rename(columns={'duration': ss.tag}, inplace=True)
data_frames.append(ss.df)
result = pd.concat(data_frames, axis=1)
if step_fmt == 'D':
result = result.to_timestamp() # `PeriodIndex` to `DatetimeIndex`.
result = result.asfreq('D', fill_value=0) # Fill missing days.
plot = result.plot(kind='bar')
plot.set_title('Minutes spent by {p}'.format(p=step))
plot.set_xlabel('{p}s'.format(p=step))
plot.set_ylabel('minutes')
plt.show()