python类TimeGrouper()的实例源码

test_downsampling.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def check_downsampled_term(self, term):

        #       June 2014
        # Mo Tu We Th Fr Sa Su
        #                    1
        #  2  3  4  5  6  7  8
        #  9 10 11 12 13 14 15
        # 16 17 18 19 20 21 22
        # 23 24 25 26 27 28 29
        # 30
        all_sessions = self.nyse_sessions
        compute_dates = all_sessions[
            all_sessions.slice_indexer('2014-06-05', '2015-01-06')
        ]
        start_date, end_date = compute_dates[[0, -1]]

        pipe = Pipeline({
            'year': term.downsample(frequency='year_start'),
            'quarter': term.downsample(frequency='quarter_start'),
            'month': term.downsample(frequency='month_start'),
            'week': term.downsample(frequency='week_start'),
        })

        # Raw values for term, computed each day from 2014 to the end of the
        # target period.
        raw_term_results = self.run_pipeline(
            Pipeline({'term': term}),
            start_date=pd.Timestamp('2014-01-02', tz='UTC'),
            end_date=pd.Timestamp('2015-01-06', tz='UTC'),
        )['term'].unstack()

        expected_results = {
            'year': (raw_term_results
                     .groupby(pd.TimeGrouper('AS'))
                     .first()
                     .reindex(compute_dates, method='ffill')),
            'quarter': (raw_term_results
                        .groupby(pd.TimeGrouper('QS'))
                        .first()
                        .reindex(compute_dates, method='ffill')),
            'month': (raw_term_results
                      .groupby(pd.TimeGrouper('MS'))
                      .first()
                      .reindex(compute_dates, method='ffill')),
            'week': (raw_term_results
                     .groupby(pd.TimeGrouper('W', label='left'))
                     .first()
                     .reindex(compute_dates, method='ffill')),
        }

        results = self.run_pipeline(pipe, start_date, end_date)

        for frequency in expected_results:
            result = results[frequency].unstack()
            expected = expected_results[frequency]
            assert_frame_equal(result, expected)
test_resample.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_aggregate_normal(self):
        # check TimeGrouper's aggregation is identical as normal groupby

        n = 20
        data = np.random.randn(n, 4)
        normal_df = DataFrame(data, columns=['A', 'B', 'C', 'D'])
        normal_df['key'] = [1, 2, 3, 4, 5] * 4

        dt_df = DataFrame(data, columns=['A', 'B', 'C', 'D'])
        dt_df['key'] = [datetime(2013, 1, 1), datetime(2013, 1, 2),
                        datetime(2013, 1, 3), datetime(2013, 1, 4),
                        datetime(2013, 1, 5)] * 4

        normal_grouped = normal_df.groupby('key')
        dt_grouped = dt_df.groupby(TimeGrouper(key='key', freq='D'))

        for func in ['min', 'max', 'prod', 'var', 'std', 'mean']:
            expected = getattr(normal_grouped, func)()
            dt_result = getattr(dt_grouped, func)()
            expected.index = date_range(start='2013-01-01', freq='D',
                                        periods=5, name='key')
            assert_frame_equal(expected, dt_result)

        for func in ['count', 'sum']:
            expected = getattr(normal_grouped, func)()
            expected.index = date_range(start='2013-01-01', freq='D',
                                        periods=5, name='key')
            dt_result = getattr(dt_grouped, func)()
            assert_frame_equal(expected, dt_result)

        # GH 7453
        for func in ['size']:
            expected = getattr(normal_grouped, func)()
            expected.index = date_range(start='2013-01-01', freq='D',
                                        periods=5, name='key')
            dt_result = getattr(dt_grouped, func)()
            assert_series_equal(expected, dt_result)
        """
        for func in ['first', 'last']:
            expected = getattr(normal_grouped, func)()
            expected.index = date_range(start='2013-01-01', freq='D',
                                        periods=5, name='key')
            dt_result = getattr(dt_grouped, func)()
            assert_frame_equal(expected, dt_result)

        for func in ['nth']:
            expected = getattr(normal_grouped, func)(3)
            expected.index = date_range(start='2013-01-01',
                                        freq='D', periods=5, name='key')
            dt_result = getattr(dt_grouped, func)(3)
            assert_frame_equal(expected, dt_result)
        """
        # if TimeGrouper is used included, 'first','last' and 'nth' doesn't
        # work yet
test_groupby.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_transform(self):
        data = Series(np.arange(9) // 3, index=np.arange(9))

        index = np.arange(9)
        np.random.shuffle(index)
        data = data.reindex(index)

        grouped = data.groupby(lambda x: x // 3)

        transformed = grouped.transform(lambda x: x * x.sum())
        self.assertEqual(transformed[7], 12)

        # GH 8046
        # make sure that we preserve the input order

        df = DataFrame(
            np.arange(6, dtype='int64').reshape(
                3, 2), columns=["a", "b"], index=[0, 2, 1])
        key = [0, 0, 1]
        expected = df.sort_index().groupby(key).transform(
            lambda x: x - x.mean()).groupby(key).mean()
        result = df.groupby(key).transform(lambda x: x - x.mean()).groupby(
            key).mean()
        assert_frame_equal(result, expected)

        def demean(arr):
            return arr - arr.mean()

        people = DataFrame(np.random.randn(5, 5),
                           columns=['a', 'b', 'c', 'd', 'e'],
                           index=['Joe', 'Steve', 'Wes', 'Jim', 'Travis'])
        key = ['one', 'two', 'one', 'two', 'one']
        result = people.groupby(key).transform(demean).groupby(key).mean()
        expected = people.groupby(key).apply(demean).groupby(key).mean()
        assert_frame_equal(result, expected)

        # GH 8430
        df = tm.makeTimeDataFrame()
        g = df.groupby(pd.TimeGrouper('M'))
        g.transform(lambda x: x - 1)

        # GH 9700
        df = DataFrame({'a': range(5, 10), 'b': range(5)})
        result = df.groupby('a').transform(max)
        expected = DataFrame({'b': range(5)})
        tm.assert_frame_equal(result, expected)
agent.py 文件源码 项目:volttron-applications 作者: VOLTTRON 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def calculate_latest_coeffs(self):
        unit_topic_tmpl = "{campus}/{building}/{unit}/{point}"
        topic_tmpl = "{campus}/{building}/{unit}/{subdevice}/{point}"
        unit_points = [self.out_temp_name, self.supply_temp_name]
        zone_points = [self.zone_temp_name, self.air_flow_rate_name]
        df = None

        for point in unit_points:
            unit_topic = unit_topic_tmpl.format(campus=self.site,
                                                building=self.building,
                                                unit=self.unit,
                                                point=point)
            result = self.vip.rpc.call('platform.historian',
                                       'query',
                                       topic=unit_topic,
                                       count=self.no_of_recs_needed,
                                       order="LAST_TO_FIRST").get(timeout=1000)
            df2 = pd.DataFrame(result['values'], columns=[self.ts_name, point])
            self.convert_units_to_SI(df2, point, result['metadata']['units'])
            df2[self.ts_name] = pd.to_datetime(df2[self.ts_name])
            df2 = df2.groupby([pd.TimeGrouper(key=self.ts_name, freq=self.aggregate_freq)]).mean()
            #df2[self.ts_name] = df2[self.ts_name].apply(lambda dt: dt.replace(second=0, microsecond=0))
            df = df2 if df is None else pd.merge(df, df2, how='outer', left_index=True, right_index=True)

        for subdevice in self.subdevices:
            for point in zone_points:
                # Query data from platform historian
                topic = topic_tmpl.format(campus=self.site,
                                          building=self.building,
                                          unit=self.unit,
                                          subdevice=subdevice,
                                          point=point)
                result = self.vip.rpc.call('platform.historian',
                                           'query',
                                           topic=topic,
                                           count=self.no_of_recs_needed,
                                           order="LAST_TO_FIRST").get(timeout=1000)
                # Merge new point data to df
                df2 = pd.DataFrame(result['values'], columns=[self.ts_name, point])
                self.convert_units_to_SI(df2, point, result['metadata']['units'])
                df2[self.ts_name] = pd.to_datetime(df2[self.ts_name])
                df2 = df2.groupby([pd.TimeGrouper(key=self.ts_name, freq=self.aggregate_freq)]).mean()
                #df2[self.ts_name] = df2[self.ts_name].apply(lambda dt: dt.replace(second=0, microsecond=0))
                df = pd.merge(df, df2, how='outer', left_index=True, right_index=True)
            #print(df)
            coeffs = self.calculate_coeffs(df)
            # Publish coeffs to store
            if coeffs is not None:
                self.save_coeffs(coeffs, subdevice)
metadata.py 文件源码 项目:openbadge-analysis 作者: HumanDynamics 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def id_to_member_mapping(fileobject, time_bins_size='1min', tz='US/Eastern'):
    """Creates a mapping from badge id to member, for each time bin, from proximity data file.

    Parameters
    ----------
    fileobject : file or iterable list of str
        The proximity data, as an iterable of JSON strings.

    time_bins_size : str
        The size of the time bins used for resampling.  Defaults to '1min'.

    tz : str
        The time zone used for localization of dates.  Defaults to 'US/Eastern'.

    Returns
    -------
    pd.Series :
        A mapping from badge id to member, indexed by datetime and id.
    """

    def readfile(fileobject):
        for line in fileobject:
            data = json.loads(line)['data']

            yield (data['timestamp'],
                   mac_address_to_id(data['badge_address']),
                   str(data['member']))

    df = pd.DataFrame(readfile(fileobject), columns=['timestamp', 'id', 'member'])

    # Convert the timestamp to a datetime, localized in UTC
    df['datetime'] = pd.to_datetime(df['timestamp'], unit='s', utc=True) \
            .dt.tz_localize('UTC').dt.tz_convert(tz)
    del df['timestamp']

    # Group by id and resample
    df = df.groupby([
        pd.TimeGrouper(time_bins_size, key='datetime'),
        'id'
    ]).first()

    df.sort_index(inplace=True)

    return df['member']
metadata.py 文件源码 项目:openbadge-analysis 作者: HumanDynamics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def voltages(fileobject, time_bins_size='1min', tz='US/Eastern'):
    """Creates a DataFrame of voltages, for each member and time bin.

    Parameters
    ----------
    fileobject : file or iterable list of str
        The proximity data, as an iterable of JSON strings.

    time_bins_size : str
        The size of the time bins used for resampling.  Defaults to '1min'.

    tz : str
        The time zone used for localization of dates.  Defaults to 'US/Eastern'.

    Returns
    -------
    pd.Series :
        Voltages, indexed by datetime and member.
    """

    def readfile(fileobject):
        for line in fileobject:
            data = json.loads(line)['data']

            yield (data['timestamp'],
                   str(data['member']),
                   float(data['voltage']))

    df = pd.DataFrame(readfile(fileobject), columns=['timestamp', 'member', 'voltage'])

    # Convert the timestamp to a datetime, localized in UTC
    df['datetime'] = pd.to_datetime(df['timestamp'], unit='s', utc=True) \
                       .dt.tz_localize('UTC').dt.tz_convert(tz)
    del df['timestamp']

    # Group by id and resample
    df = df.groupby([
        pd.TimeGrouper(time_bins_size, key='datetime'),
        'member'
    ]).mean()

    df.sort_index(inplace=True)

    return df['voltage']
proximity.py 文件源码 项目:openbadge-analysis 作者: HumanDynamics 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def member_to_badge_proximity(fileobject, time_bins_size='1min', tz='US/Eastern'):
    """Creates a member-to-badge proximity DataFrame from a proximity data file.

    Parameters
    ----------
    fileobject : file or iterable list of str
        The proximity data, as an iterable of JSON strings.

    time_bins_size : str
        The size of the time bins used for resampling.  Defaults to '1min'.

    tz : str
        The time zone used for localization of dates.  Defaults to 'US/Eastern'.

    Returns
    -------
    pd.DataFrame :
        The member-to-badge proximity data.
    """

    def readfile(fileobject):
        for line in fileobject:
            data = json.loads(line)['data']

            for (observed_id, distance) in data['rssi_distances'].items():
                yield (
                    data['timestamp'],
                    str(data['member']),
                    int(observed_id),
                    float(distance['rssi']),
                    float(distance['count']),
                )

    df = pd.DataFrame(
            readfile(fileobject),
            columns=('timestamp', 'member', 'observed_id', 'rssi', 'count')
    )

    # Convert timestamp to datetime for convenience, and localize to UTC
    df['datetime'] = pd.to_datetime(df['timestamp'], unit='s', utc=True) \
            .dt.tz_localize('UTC').dt.tz_convert(tz)
    del df['timestamp']

    # Group per time bins, member and observed_id,
    # and take the first value, arbitrarily
    df = df.groupby([
        pd.TimeGrouper(time_bins_size, key='datetime'),
        'member',
        'observed_id'
    ]).first()

    # Sort the data
    df.sort_index(inplace=True)

    return df


问题


面经


文章

微信
公众号

扫码关注公众号