python类to_timedelta()的实例源码

test_format.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_even_day(self):
        delta_1d = pd.to_timedelta(1, unit='D')
        delta_0d = pd.to_timedelta(0, unit='D')
        delta_1s = pd.to_timedelta(1, unit='s')
        delta_500ms = pd.to_timedelta(500, unit='ms')

        drepr = lambda x: x._repr_base(format='even_day')
        self.assertEqual(drepr(delta_1d), "1 days")
        self.assertEqual(drepr(-delta_1d), "-1 days")
        self.assertEqual(drepr(delta_0d), "0 days")
        self.assertEqual(drepr(delta_1s), "0 days 00:00:01")
        self.assertEqual(drepr(delta_500ms), "0 days 00:00:00.500000")
        self.assertEqual(drepr(delta_1d + delta_1s), "1 days 00:00:01")
        self.assertEqual(
            drepr(delta_1d + delta_500ms), "1 days 00:00:00.500000")
test_format.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_sub_day(self):
        delta_1d = pd.to_timedelta(1, unit='D')
        delta_0d = pd.to_timedelta(0, unit='D')
        delta_1s = pd.to_timedelta(1, unit='s')
        delta_500ms = pd.to_timedelta(500, unit='ms')

        drepr = lambda x: x._repr_base(format='sub_day')
        self.assertEqual(drepr(delta_1d), "1 days")
        self.assertEqual(drepr(-delta_1d), "-1 days")
        self.assertEqual(drepr(delta_0d), "00:00:00")
        self.assertEqual(drepr(delta_1s), "00:00:01")
        self.assertEqual(drepr(delta_500ms), "00:00:00.500000")
        self.assertEqual(drepr(delta_1d + delta_1s), "1 days 00:00:01")
        self.assertEqual(
            drepr(delta_1d + delta_500ms), "1 days 00:00:00.500000")
test_format.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_long(self):
        delta_1d = pd.to_timedelta(1, unit='D')
        delta_0d = pd.to_timedelta(0, unit='D')
        delta_1s = pd.to_timedelta(1, unit='s')
        delta_500ms = pd.to_timedelta(500, unit='ms')

        drepr = lambda x: x._repr_base(format='long')
        self.assertEqual(drepr(delta_1d), "1 days 00:00:00")
        self.assertEqual(drepr(-delta_1d), "-1 days +00:00:00")
        self.assertEqual(drepr(delta_0d), "0 days 00:00:00")
        self.assertEqual(drepr(delta_1s), "0 days 00:00:01")
        self.assertEqual(drepr(delta_500ms), "0 days 00:00:00.500000")
        self.assertEqual(drepr(delta_1d + delta_1s), "1 days 00:00:01")
        self.assertEqual(
            drepr(delta_1d + delta_500ms), "1 days 00:00:00.500000")
test_format.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_all(self):
        delta_1d = pd.to_timedelta(1, unit='D')
        delta_0d = pd.to_timedelta(0, unit='D')
        delta_1ns = pd.to_timedelta(1, unit='ns')

        drepr = lambda x: x._repr_base(format='all')
        self.assertEqual(drepr(delta_1d), "1 days 00:00:00.000000000")
        self.assertEqual(drepr(delta_0d), "0 days 00:00:00.000000000")
        self.assertEqual(drepr(delta_1ns), "0 days 00:00:00.000000001")
test_format.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_days(self):
        x = pd.to_timedelta(list(range(5)) + [pd.NaT], unit='D')
        result = fmt.Timedelta64Formatter(x, box=True).get_result()
        self.assertEqual(result[0].strip(), "'0 days'")
        self.assertEqual(result[1].strip(), "'1 days'")

        result = fmt.Timedelta64Formatter(x[1:2], box=True).get_result()
        self.assertEqual(result[0].strip(), "'1 days'")

        result = fmt.Timedelta64Formatter(x, box=False).get_result()
        self.assertEqual(result[0].strip(), "0 days")
        self.assertEqual(result[1].strip(), "1 days")

        result = fmt.Timedelta64Formatter(x[1:2], box=False).get_result()
        self.assertEqual(result[0].strip(), "1 days")
test_format.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_subdays(self):
        y = pd.to_timedelta(list(range(5)) + [pd.NaT], unit='s')
        result = fmt.Timedelta64Formatter(y, box=True).get_result()
        self.assertEqual(result[0].strip(), "'00:00:00'")
        self.assertEqual(result[1].strip(), "'00:00:01'")
test_format.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_subdays_neg(self):
        y = pd.to_timedelta(list(range(5)) + [pd.NaT], unit='s')
        result = fmt.Timedelta64Formatter(-y, box=True).get_result()
        self.assertEqual(result[0].strip(), "'00:00:00'")
        self.assertEqual(result[1].strip(), "'-1 days +23:59:59'")
test_format.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_zero(self):
        x = pd.to_timedelta(list(range(1)) + [pd.NaT], unit='D')
        result = fmt.Timedelta64Formatter(x, box=True).get_result()
        self.assertEqual(result[0].strip(), "'0 days'")

        x = pd.to_timedelta(list(range(1)), unit='D')
        result = fmt.Timedelta64Formatter(x, box=True).get_result()
        self.assertEqual(result[0].strip(), "'0 days'")
test_generic.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_describe_timedelta(self):
        df = DataFrame({"td": pd.to_timedelta(np.arange(24) % 20, "D")})
        self.assertTrue(df.describe().loc["mean"][0] == pd.to_timedelta(
            "8d4h"))
sas7bdat.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _chunk_to_dataframe(self):

        n = self._current_row_in_chunk_index
        m = self._current_row_in_file_index
        ix = range(m - n, m)
        rslt = pd.DataFrame(index=ix)

        js, jb = 0, 0
        for j in range(self.column_count):

            name = self.column_names[j]

            if self.column_types[j] == b'd':
                rslt[name] = self._byte_chunk[jb, :].view(
                    dtype=self.byte_order + 'd')
                rslt[name] = np.asarray(rslt[name], dtype=np.float64)
                if self.convert_dates and (self.column_formats[j] == "MMDDYY"):
                    epoch = pd.datetime(1960, 1, 1)
                    rslt[name] = epoch + pd.to_timedelta(rslt[name], unit='d')
                jb += 1
            elif self.column_types[j] == b's':
                rslt[name] = self._string_chunk[js, :]
                rslt[name] = rslt[name].apply(lambda x: x.rstrip(b'\x00 '))
                if self.encoding is not None:
                    rslt[name] = rslt[name].apply(
                        lambda x: x.decode(encoding=self.encoding))
                if self.blank_missing:
                    ii = rslt[name].str.len() == 0
                    rslt.loc[ii, name] = np.nan
                js += 1
            else:
                raise ValueError("unknown column type %s" %
                                 self.column_types[j])

        return rslt
test_sql.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_timedelta(self):

        # see #6921
        df = to_timedelta(
            Series(['00:00:01', '00:00:03'], name='foo')).to_frame()
        with tm.assert_produces_warning(UserWarning):
            df.to_sql('test_timedelta', self.conn)
        result = sql.read_sql_query('SELECT * FROM test_timedelta', self.conn)
        tm.assert_series_equal(result['foo'], df['foo'].astype('int64'))
test_pandas.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_timedelta(self):
        converter = lambda x: pd.to_timedelta(x, unit='ms')

        s = Series([timedelta(23), timedelta(seconds=5)])
        self.assertEqual(s.dtype, 'timedelta64[ns]')
        # index will be float dtype
        assert_series_equal(s, pd.read_json(s.to_json(), typ='series')
                            .apply(converter),
                            check_index_type=False)

        s = Series([timedelta(23), timedelta(seconds=5)],
                   index=pd.Index([0, 1], dtype=float))
        self.assertEqual(s.dtype, 'timedelta64[ns]')
        assert_series_equal(s, pd.read_json(
            s.to_json(), typ='series').apply(converter))

        frame = DataFrame([timedelta(23), timedelta(seconds=5)])
        self.assertEqual(frame[0].dtype, 'timedelta64[ns]')
        assert_frame_equal(frame, pd.read_json(frame.to_json())
                           .apply(converter),
                           check_index_type=False,
                           check_column_type=False)

        frame = DataFrame({'a': [timedelta(days=23), timedelta(seconds=5)],
                           'b': [1, 2],
                           'c': pd.date_range(start='20130101', periods=2)})

        result = pd.read_json(frame.to_json(date_unit='ns'))
        result['a'] = pd.to_timedelta(result.a, unit='ns')
        result['c'] = pd.to_datetime(result.c)
        assert_frame_equal(frame, result, check_index_type=False)
test_merge.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_concat_timedelta64_block(self):
        from pandas import to_timedelta

        rng = to_timedelta(np.arange(10), unit='s')

        df = DataFrame({'time': rng})

        result = concat([df, df])
        self.assertTrue((result.iloc[:10]['time'] == rng).all())
        self.assertTrue((result.iloc[10:]['time'] == rng).all())
apps.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def init():
    """Return top level command handler."""

    @click.command()
    @cli.handle_exceptions(restclient.CLI_REST_EXCEPTIONS)
    @click.option('--match', help='Server name pattern match')
    @click.option('--full', is_flag=True, default=False)
    @click.pass_context
    def apps(ctx, match, full):
        """View apps report."""
        report = fetch_report(ctx.obj.get('api'), 'apps', match)
        # Replace integer N/As
        for col in ['identity', 'expires', 'lease', 'data_retention']:
            report.loc[report[col] == -1, col] = ''
        # Convert to datetimes
        for col in ['expires']:
            report[col] = pd.to_datetime(report[col], unit='s')
        # Convert to timedeltas
        for col in ['lease', 'data_retention']:
            report[col] = pd.to_timedelta(report[col], unit='s')
        report = report.fillna('')

        if not full:
            report = report[[
                'instance', 'allocation', 'partition', 'server',
                'mem', 'cpu', 'disk'
            ]]

        print_report(report)

    return apps
DataTushareModel.py 文件源码 项目:python_data_tools 作者: king3366ster 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def format_date_to_datetime(self, df, t_date = None):
        if t_date is None:
            t_date = dataTime.datetimeRelative(delta = 0)
        t_date = t_date.replace(' 00:00:00', '')
        df_new = df.copy()
        df_new.insert(0, 'datetime', t_date)
        df_new['datetime'] = pd.to_datetime(df_new['datetime'])
        df_new['time'] = pd.to_timedelta(df_new['time'])
        df_new['datetime'] = df_new['datetime'] + df_new['time']
        df_new = df_new.sort_values(['datetime'], ascending=[True])
        del df_new['time']

        return df_new
# ????
    # ?????
        # code???????6?????????????sh=???? sz=???? hs300=??300?? sz50=??50 zxb=??? cyb=????
        # start????????YYYY-MM-DD
        # end????????YYYY-MM-DD
        # ktype??????D=?k? W=? M=? 5=5?? 15=15?? 30=30?? 60=60??????D
        # retry_count???????????????3
        # pause:???????????0

    # ??????
        # date???
        # open????
        # high????
        # close????
        # low????
        # volume????
        # price_change?????
        # p_change????
        # ma5?5???
        # ma10?10???
        # ma20:20???
        # v_ma5:5???
        # v_ma10:10???
        # v_ma20:20???
        # turnover:???[???????]
midas.py 文件源码 项目:scikit-discovery 作者: MITHaystack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def process(self, obj_data):
        '''
        Apply the MIDAS estimator to generate velocity estimates

        Adds the result to the data wrapper

        @param obj_data: Data wrapper
        '''

        if self.column_names == None:
            column_names = obj_data.getDefaultColumns()
        else:
            column_names = self.column_names

        time_diff = pd.to_timedelta('365d')
        results = dict()
        for label, data in obj_data.getIterator():
            start_date = data.index[0]
            end_date = data.index[-1]
            for column in column_names:
                start_data = data.loc[start_date:(end_date-time_diff), column]
                end_data = data.loc[start_date+time_diff:end_date, column]

                offsets = end_data.values - start_data.values
                offsets = offsets[~np.isnan(offsets)]
                med_off = np.median(offsets)
                mad_off = mad(offsets)

                cut_offsets = offsets[np.logical_and(offsets < med_off + 2*mad_off, 
                                                     offsets > med_off - 2*mad_off)]
                final_vel = np.median(cut_offsets)
                final_unc = np.sqrt(np.pi/2) * mad(cut_offsets) / np.sqrt(len(cut_offsets))

                results[label] = pd.DataFrame([final_vel,final_unc], ['velocity', 'uncertainty'] ,[column])

        obj_data.addResult(self.str_description, pd.Panel.fromDict(results,orient='minor'))
datetime.py 文件源码 项目:qutils 作者: Raychee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def to_seconds(timedelta_str):
    return to_timedelta(timedelta_str).total_seconds()
datetime.py 文件源码 项目:qutils 作者: Raychee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def to_timedelta(timedelta_repr):
    return pd.to_timedelta(str(timedelta_repr), unit='s')
monkey_patch.py 文件源码 项目:qutils 作者: Raychee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_timedelta_to_human(self):
        for td in timedelta(days=1, seconds=3900), pd.to_timedelta('1d1h5m'):
            self.assertEqual('1.05 days', timedelta_to_human(td, precision=2))
            self.assertEqual('1.0 day', timedelta_to_human(td, precision=1))
        for td in timedelta(days=-1, seconds=-3900), pd.to_timedelta('-1d1h5m'):
            self.assertEqual('1.05 days ago', timedelta_to_human(td, precision=2))
            self.assertEqual('1.0 day ago', timedelta_to_human(td, precision=1))
gen_action_fea.py 文件源码 项目:JDcontest 作者: zsyandjyhouse 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_accumulate_action_feat(start_time, end_time,action_data):
    actions=action_data[(action_data['time']>=start_time)&(action_data['time']<=end_time)]
    action_data['time'] = pd.to_datetime(action_data['time'],format='%Y-%m-%d %H:%M:%S')
    df = pd.get_dummies(actions['type'], prefix='action')
    actions = pd.concat([actions, df], axis=1) # type: pd.DataFrame
    #?????????
    actions['weights'] = actions['time'].map(lambda x: pd.to_timedelta(end_time-x))
    #actions['weights'] = time.strptime(end_date, '%Y-%m-%d') - actions['datetime']
    actions['weights'] = actions['weights'].map(lambda x: math.exp(-x.days))
    print actions.head(10)
    actions['action_1'] = actions['action_1'] * actions['weights']
    actions['action_2'] = actions['action_2'] * actions['weights']
    actions['action_3'] = actions['action_3'] * actions['weights']
    actions['action_4'] = actions['action_4'] * actions['weights']
    actions['action_5'] = actions['action_5'] * actions['weights']
    actions['action_6'] = actions['action_6'] * actions['weights']
    del actions['model_id']
    del actions['time']
    del actions['weights']
    del actions['cate']
    del actions['brand']
    actions = actions.groupby(['user_id', 'sku_id'], as_index=False).sum()
    actions.fillna(0,inplace=True)

    actions['action_1256']=actions['action_1']+actions['action_2']+actions['action_5']+actions['action_6']
    actions['action_1256_d_4']=actions['action_4']/actions['action_1256']
    del actions['type']
    return actions


问题


面经


文章

微信
公众号

扫码关注公众号