def check_downsampled_term(self, term):
# June 2014
# Mo Tu We Th Fr Sa Su
# 1
# 2 3 4 5 6 7 8
# 9 10 11 12 13 14 15
# 16 17 18 19 20 21 22
# 23 24 25 26 27 28 29
# 30
all_sessions = self.nyse_sessions
compute_dates = all_sessions[
all_sessions.slice_indexer('2014-06-05', '2015-01-06')
]
start_date, end_date = compute_dates[[0, -1]]
pipe = Pipeline({
'year': term.downsample(frequency='year_start'),
'quarter': term.downsample(frequency='quarter_start'),
'month': term.downsample(frequency='month_start'),
'week': term.downsample(frequency='week_start'),
})
# Raw values for term, computed each day from 2014 to the end of the
# target period.
raw_term_results = self.run_pipeline(
Pipeline({'term': term}),
start_date=pd.Timestamp('2014-01-02', tz='UTC'),
end_date=pd.Timestamp('2015-01-06', tz='UTC'),
)['term'].unstack()
expected_results = {
'year': (raw_term_results
.groupby(pd.TimeGrouper('AS'))
.first()
.reindex(compute_dates, method='ffill')),
'quarter': (raw_term_results
.groupby(pd.TimeGrouper('QS'))
.first()
.reindex(compute_dates, method='ffill')),
'month': (raw_term_results
.groupby(pd.TimeGrouper('MS'))
.first()
.reindex(compute_dates, method='ffill')),
'week': (raw_term_results
.groupby(pd.TimeGrouper('W', label='left'))
.first()
.reindex(compute_dates, method='ffill')),
}
results = self.run_pipeline(pipe, start_date, end_date)
for frequency in expected_results:
result = results[frequency].unstack()
expected = expected_results[frequency]
assert_frame_equal(result, expected)
python类TimeGrouper()的实例源码
test_resample.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def test_aggregate_normal(self):
# check TimeGrouper's aggregation is identical as normal groupby
n = 20
data = np.random.randn(n, 4)
normal_df = DataFrame(data, columns=['A', 'B', 'C', 'D'])
normal_df['key'] = [1, 2, 3, 4, 5] * 4
dt_df = DataFrame(data, columns=['A', 'B', 'C', 'D'])
dt_df['key'] = [datetime(2013, 1, 1), datetime(2013, 1, 2),
datetime(2013, 1, 3), datetime(2013, 1, 4),
datetime(2013, 1, 5)] * 4
normal_grouped = normal_df.groupby('key')
dt_grouped = dt_df.groupby(TimeGrouper(key='key', freq='D'))
for func in ['min', 'max', 'prod', 'var', 'std', 'mean']:
expected = getattr(normal_grouped, func)()
dt_result = getattr(dt_grouped, func)()
expected.index = date_range(start='2013-01-01', freq='D',
periods=5, name='key')
assert_frame_equal(expected, dt_result)
for func in ['count', 'sum']:
expected = getattr(normal_grouped, func)()
expected.index = date_range(start='2013-01-01', freq='D',
periods=5, name='key')
dt_result = getattr(dt_grouped, func)()
assert_frame_equal(expected, dt_result)
# GH 7453
for func in ['size']:
expected = getattr(normal_grouped, func)()
expected.index = date_range(start='2013-01-01', freq='D',
periods=5, name='key')
dt_result = getattr(dt_grouped, func)()
assert_series_equal(expected, dt_result)
"""
for func in ['first', 'last']:
expected = getattr(normal_grouped, func)()
expected.index = date_range(start='2013-01-01', freq='D',
periods=5, name='key')
dt_result = getattr(dt_grouped, func)()
assert_frame_equal(expected, dt_result)
for func in ['nth']:
expected = getattr(normal_grouped, func)(3)
expected.index = date_range(start='2013-01-01',
freq='D', periods=5, name='key')
dt_result = getattr(dt_grouped, func)(3)
assert_frame_equal(expected, dt_result)
"""
# if TimeGrouper is used included, 'first','last' and 'nth' doesn't
# work yet
test_groupby.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_transform(self):
data = Series(np.arange(9) // 3, index=np.arange(9))
index = np.arange(9)
np.random.shuffle(index)
data = data.reindex(index)
grouped = data.groupby(lambda x: x // 3)
transformed = grouped.transform(lambda x: x * x.sum())
self.assertEqual(transformed[7], 12)
# GH 8046
# make sure that we preserve the input order
df = DataFrame(
np.arange(6, dtype='int64').reshape(
3, 2), columns=["a", "b"], index=[0, 2, 1])
key = [0, 0, 1]
expected = df.sort_index().groupby(key).transform(
lambda x: x - x.mean()).groupby(key).mean()
result = df.groupby(key).transform(lambda x: x - x.mean()).groupby(
key).mean()
assert_frame_equal(result, expected)
def demean(arr):
return arr - arr.mean()
people = DataFrame(np.random.randn(5, 5),
columns=['a', 'b', 'c', 'd', 'e'],
index=['Joe', 'Steve', 'Wes', 'Jim', 'Travis'])
key = ['one', 'two', 'one', 'two', 'one']
result = people.groupby(key).transform(demean).groupby(key).mean()
expected = people.groupby(key).apply(demean).groupby(key).mean()
assert_frame_equal(result, expected)
# GH 8430
df = tm.makeTimeDataFrame()
g = df.groupby(pd.TimeGrouper('M'))
g.transform(lambda x: x - 1)
# GH 9700
df = DataFrame({'a': range(5, 10), 'b': range(5)})
result = df.groupby('a').transform(max)
expected = DataFrame({'b': range(5)})
tm.assert_frame_equal(result, expected)
def calculate_latest_coeffs(self):
unit_topic_tmpl = "{campus}/{building}/{unit}/{point}"
topic_tmpl = "{campus}/{building}/{unit}/{subdevice}/{point}"
unit_points = [self.out_temp_name, self.supply_temp_name]
zone_points = [self.zone_temp_name, self.air_flow_rate_name]
df = None
for point in unit_points:
unit_topic = unit_topic_tmpl.format(campus=self.site,
building=self.building,
unit=self.unit,
point=point)
result = self.vip.rpc.call('platform.historian',
'query',
topic=unit_topic,
count=self.no_of_recs_needed,
order="LAST_TO_FIRST").get(timeout=1000)
df2 = pd.DataFrame(result['values'], columns=[self.ts_name, point])
self.convert_units_to_SI(df2, point, result['metadata']['units'])
df2[self.ts_name] = pd.to_datetime(df2[self.ts_name])
df2 = df2.groupby([pd.TimeGrouper(key=self.ts_name, freq=self.aggregate_freq)]).mean()
#df2[self.ts_name] = df2[self.ts_name].apply(lambda dt: dt.replace(second=0, microsecond=0))
df = df2 if df is None else pd.merge(df, df2, how='outer', left_index=True, right_index=True)
for subdevice in self.subdevices:
for point in zone_points:
# Query data from platform historian
topic = topic_tmpl.format(campus=self.site,
building=self.building,
unit=self.unit,
subdevice=subdevice,
point=point)
result = self.vip.rpc.call('platform.historian',
'query',
topic=topic,
count=self.no_of_recs_needed,
order="LAST_TO_FIRST").get(timeout=1000)
# Merge new point data to df
df2 = pd.DataFrame(result['values'], columns=[self.ts_name, point])
self.convert_units_to_SI(df2, point, result['metadata']['units'])
df2[self.ts_name] = pd.to_datetime(df2[self.ts_name])
df2 = df2.groupby([pd.TimeGrouper(key=self.ts_name, freq=self.aggregate_freq)]).mean()
#df2[self.ts_name] = df2[self.ts_name].apply(lambda dt: dt.replace(second=0, microsecond=0))
df = pd.merge(df, df2, how='outer', left_index=True, right_index=True)
#print(df)
coeffs = self.calculate_coeffs(df)
# Publish coeffs to store
if coeffs is not None:
self.save_coeffs(coeffs, subdevice)
def id_to_member_mapping(fileobject, time_bins_size='1min', tz='US/Eastern'):
"""Creates a mapping from badge id to member, for each time bin, from proximity data file.
Parameters
----------
fileobject : file or iterable list of str
The proximity data, as an iterable of JSON strings.
time_bins_size : str
The size of the time bins used for resampling. Defaults to '1min'.
tz : str
The time zone used for localization of dates. Defaults to 'US/Eastern'.
Returns
-------
pd.Series :
A mapping from badge id to member, indexed by datetime and id.
"""
def readfile(fileobject):
for line in fileobject:
data = json.loads(line)['data']
yield (data['timestamp'],
mac_address_to_id(data['badge_address']),
str(data['member']))
df = pd.DataFrame(readfile(fileobject), columns=['timestamp', 'id', 'member'])
# Convert the timestamp to a datetime, localized in UTC
df['datetime'] = pd.to_datetime(df['timestamp'], unit='s', utc=True) \
.dt.tz_localize('UTC').dt.tz_convert(tz)
del df['timestamp']
# Group by id and resample
df = df.groupby([
pd.TimeGrouper(time_bins_size, key='datetime'),
'id'
]).first()
df.sort_index(inplace=True)
return df['member']
def voltages(fileobject, time_bins_size='1min', tz='US/Eastern'):
"""Creates a DataFrame of voltages, for each member and time bin.
Parameters
----------
fileobject : file or iterable list of str
The proximity data, as an iterable of JSON strings.
time_bins_size : str
The size of the time bins used for resampling. Defaults to '1min'.
tz : str
The time zone used for localization of dates. Defaults to 'US/Eastern'.
Returns
-------
pd.Series :
Voltages, indexed by datetime and member.
"""
def readfile(fileobject):
for line in fileobject:
data = json.loads(line)['data']
yield (data['timestamp'],
str(data['member']),
float(data['voltage']))
df = pd.DataFrame(readfile(fileobject), columns=['timestamp', 'member', 'voltage'])
# Convert the timestamp to a datetime, localized in UTC
df['datetime'] = pd.to_datetime(df['timestamp'], unit='s', utc=True) \
.dt.tz_localize('UTC').dt.tz_convert(tz)
del df['timestamp']
# Group by id and resample
df = df.groupby([
pd.TimeGrouper(time_bins_size, key='datetime'),
'member'
]).mean()
df.sort_index(inplace=True)
return df['voltage']
def member_to_badge_proximity(fileobject, time_bins_size='1min', tz='US/Eastern'):
"""Creates a member-to-badge proximity DataFrame from a proximity data file.
Parameters
----------
fileobject : file or iterable list of str
The proximity data, as an iterable of JSON strings.
time_bins_size : str
The size of the time bins used for resampling. Defaults to '1min'.
tz : str
The time zone used for localization of dates. Defaults to 'US/Eastern'.
Returns
-------
pd.DataFrame :
The member-to-badge proximity data.
"""
def readfile(fileobject):
for line in fileobject:
data = json.loads(line)['data']
for (observed_id, distance) in data['rssi_distances'].items():
yield (
data['timestamp'],
str(data['member']),
int(observed_id),
float(distance['rssi']),
float(distance['count']),
)
df = pd.DataFrame(
readfile(fileobject),
columns=('timestamp', 'member', 'observed_id', 'rssi', 'count')
)
# Convert timestamp to datetime for convenience, and localize to UTC
df['datetime'] = pd.to_datetime(df['timestamp'], unit='s', utc=True) \
.dt.tz_localize('UTC').dt.tz_convert(tz)
del df['timestamp']
# Group per time bins, member and observed_id,
# and take the first value, arbitrarily
df = df.groupby([
pd.TimeGrouper(time_bins_size, key='datetime'),
'member',
'observed_id'
]).first()
# Sort the data
df.sort_index(inplace=True)
return df