python类to_datetime()的实例源码

test_pandas_utils.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_nearest_unequal_elements(self, tz):

        dts = pd.to_datetime(
            ['2014-01-01', '2014-01-05', '2014-01-06', '2014-01-09'],
        ).tz_localize(tz)

        def t(s):
            return None if s is None else pd.Timestamp(s, tz=tz)

        for dt, before, after in (('2013-12-30', None, '2014-01-01'),
                                  ('2013-12-31', None, '2014-01-01'),
                                  ('2014-01-01', None, '2014-01-05'),
                                  ('2014-01-02', '2014-01-01', '2014-01-05'),
                                  ('2014-01-03', '2014-01-01', '2014-01-05'),
                                  ('2014-01-04', '2014-01-01', '2014-01-05'),
                                  ('2014-01-05', '2014-01-01', '2014-01-06'),
                                  ('2014-01-06', '2014-01-05', '2014-01-09'),
                                  ('2014-01-07', '2014-01-06', '2014-01-09'),
                                  ('2014-01-08', '2014-01-06', '2014-01-09'),
                                  ('2014-01-09', '2014-01-06', None),
                                  ('2014-01-10', '2014-01-09', None),
                                  ('2014-01-11', '2014-01-09', None)):
            computed = nearest_unequal_elements(dts, t(dt))
            expected = (t(before), t(after))
            self.assertEqual(computed, expected)
test_pandas_utils.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_nearest_unequal_elements_short_dts(self, tz):

        # Length 1.
        dts = pd.to_datetime(['2014-01-01']).tz_localize(tz)

        def t(s):
            return None if s is None else pd.Timestamp(s, tz=tz)

        for dt, before, after in (('2013-12-31', None, '2014-01-01'),
                                  ('2014-01-01', None, None),
                                  ('2014-01-02', '2014-01-01', None)):
            computed = nearest_unequal_elements(dts, t(dt))
            expected = (t(before), t(after))
            self.assertEqual(computed, expected)

        # Length 0
        dts = pd.to_datetime([]).tz_localize(tz)
        for dt, before, after in (('2013-12-31', None, None),
                                  ('2014-01-01', None, None),
                                  ('2014-01-02', None, None)):
            computed = nearest_unequal_elements(dts, t(dt))
            expected = (t(before), t(after))
            self.assertEqual(computed, expected)
test_pandas_utils.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_nearest_unequal_bad_input(self):
        with self.assertRaises(ValueError) as e:
            nearest_unequal_elements(
                pd.to_datetime(['2014', '2014']),
                pd.Timestamp('2014'),
            )

        self.assertEqual(str(e.exception), 'dts must be unique')

        with self.assertRaises(ValueError) as e:
            nearest_unequal_elements(
                pd.to_datetime(['2014', '2013']),
                pd.Timestamp('2014'),
            )

        self.assertEqual(
            str(e.exception),
            'dts must be sorted in increasing order',
        )
test_quarters_estimates.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def make_expected_out(cls):
        expected = pd.DataFrame(columns=[cls.columns[col] + '1'
                                         for col in cls.columns] +
                                        [cls.columns[col] + '2'
                                         for col in cls.columns],
                                index=cls.trading_days)

        for (col, raw_name), suffix in itertools.product(
            cls.columns.items(), ('1', '2')
        ):
            expected_name = raw_name + suffix
            if col.dtype == datetime64ns_dtype:
                expected[expected_name] = pd.to_datetime(
                    expected[expected_name]
                )
            else:
                expected[expected_name] = expected[
                    expected_name
                ].astype(col.dtype)
        cls.fill_expected_out(expected)
        return expected.reindex(cls.trading_days)
test_bundle.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_spot_value(self):
        # data_frequency = 'daily'
        # exchange_name = 'poloniex'

        # exchange = get_exchange(exchange_name)
        # exchange_bundle = ExchangeBundle(exchange)
        # assets = [
        #     exchange.get_asset('btc_usdt')
        # ]
        # dt = pd.to_datetime('2017-10-14', utc=True)

        # values = exchange_bundle.get_spot_values(
        #     assets=assets,
        #     field='close',
        #     dt=dt,
        #     data_frequency=data_frequency
        # )
        pass
test_bundle.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_ingest_minute_all(self):
        exchange_name = 'bitfinex'

        # start = pd.to_datetime('2017-09-01', utc=True)
        start = pd.to_datetime('2017-10-01', utc=True)
        end = pd.to_datetime('2017-10-05', utc=True)

        exchange_bundle = ExchangeBundle(get_exchange(exchange_name))

        log.info('ingesting exchange bundle {}'.format(exchange_name))
        exchange_bundle.ingest(
            data_frequency='minute',
            exclude_symbols=None,
            start=start,
            end=end,
            show_progress=True
        )
        pass
test_bundle.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main_bundle_to_csv(self):
        exchange_name = 'poloniex'
        data_frequency = 'minute'

        exchange = get_exchange(exchange_name)
        asset = exchange.get_asset('eth_btc')

        start_dt = pd.to_datetime('2016-5-31', utc=True)
        end_dt = pd.to_datetime('2016-6-1', utc=True)
        self._bundle_to_csv(
            asset=asset,
            exchange_name=exchange.name,
            data_frequency=data_frequency,
            filename='{}_{}_{}'.format(
                exchange_name, data_frequency, asset.symbol
            ),
            start_dt=start_dt,
            end_dt=end_dt
        )
test_bundle.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_ingest_csv(self):
        data_frequency = 'minute'
        exchange_name = 'bittrex'
        path = '/Users/fredfortier/Dropbox/Enigma/Data/bittrex_bat_eth.csv'

        exchange_bundle = ExchangeBundle(exchange_name)
        exchange_bundle.ingest_csv(path, data_frequency)

        exchange = get_exchange(exchange_name)
        asset = exchange.get_asset('bat_eth')

        start_dt = pd.to_datetime('2017-6-3', utc=True)
        end_dt = pd.to_datetime('2017-8-3 19:24', utc=True)
        self._bundle_to_csv(
            asset=asset,
            exchange_name=exchange.name,
            data_frequency=data_frequency,
            filename='{}_{}_{}'.format(
                exchange_name, data_frequency, asset.symbol
            ),
            start_dt=start_dt,
            end_dt=end_dt
        )
        pass
test_poloniex.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_get_candles(self):
        log.info('retrieving candles')
        assets = self.exchange.get_asset('eth_btc')
        ohlcv = self.exchange.get_candles(
            # end_dt=pd.to_datetime('2017-11-01', utc=True),
            end_dt=None,
            freq='5T',
            assets=assets,
            bar_count=200
        )
        df = pd.DataFrame(ohlcv)
        df.set_index('last_traded', drop=True, inplace=True)
        log.info(df.tail(25))

        path = output_df(df, assets, '5min_candles')
        log.info('saved candles: {}'.format(path))
        pass
test_bcolz.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_bcolz_write_daily_present(self):
        start = pd.to_datetime('2017-01-01')
        end = pd.to_datetime('today')
        freq = 'daily'

        df = self.generate_df('bitfinex', freq, start, end)

        writer = BcolzExchangeBarWriter(
            rootdir=self.root_dir,
            start_session=start,
            end_session=end,
            data_frequency=freq,
            write_metadata=True)

        data = []
        data.append((1, df))
        writer.write(data)
        pass
test_bcolz.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_bcolz_write_minute_past(self):
        start = pd.to_datetime('2015-04-01 00:00')
        end = pd.to_datetime('2015-04-30 23:59')
        freq = 'minute'

        df = self.generate_df('bitfinex', freq, start, end)

        writer = BcolzExchangeBarWriter(
            rootdir=self.root_dir,
            start_session=start,
            end_session=end,
            data_frequency=freq,
            write_metadata=True)

        data = []
        data.append((1, df))
        writer.write(data)

        pass
test_bcolz.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_bcolz_write_minute_present(self):
        start = pd.to_datetime('2017-10-01 00:00')
        end = pd.to_datetime('today')
        freq = 'minute'

        df = self.generate_df('bitfinex', freq, start, end)

        writer = BcolzExchangeBarWriter(
            rootdir=self.root_dir,
            start_session=start,
            end_session=end,
            data_frequency=freq,
            write_metadata=True)

        data = []
        data.append((1, df))
        writer.write(data)
        pass
test_data_portal.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setup(self):
        log.info('creating bitfinex exchange')
        exchanges = get_exchanges(['bitfinex', 'bittrex', 'poloniex'])
        open_calendar = get_calendar('OPEN')
        asset_finder = AssetFinderExchange()

        self.data_portal_live = DataPortalExchangeLive(
            exchanges=exchanges,
            asset_finder=asset_finder,
            trading_calendar=open_calendar,
            first_trading_day=pd.to_datetime('today', utc=True)
        )

        self.data_portal_backtest = DataPortalExchangeBacktest(
            exchanges=exchanges,
            asset_finder=asset_finder,
            trading_calendar=open_calendar,
            first_trading_day=None  # will set dynamically based on assets
        )
sport_flow.py 文件源码 项目:AlphaPy 作者: ScottFreeLLC 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_day_offset(date_vector):
    r"""Compute the day offsets between games.

    Parameters
    ----------
    date_vector : pandas.Series
        The date column.

    Returns
    -------
    day_offset : pandas.Series
        A vector of day offsets between adjacent dates.

    """
    dv = pd.to_datetime(date_vector)
    offsets = pd.to_datetime(dv) - pd.to_datetime(dv[0])
    day_offset = offsets.astype('timedelta64[D]').astype(int)
    return day_offset


#
# Function get_series_diff
#
preprocess.py 文件源码 项目:sequence-based-recommendations 作者: rdevooght 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def load_data(filename, columns, separator):
    ''' Load the data from filename and sort it according to timestamp.
    Returns a dataframe with 3 columns: user_id, item_id, rating
    '''

    print('Load data...')
    data = pd.read_csv(filename, sep=separator, names=list(columns), index_col=False, usecols=range(len(columns)))

    if 'r' not in columns:
        # Add a column of default ratings
        data['r'] = 1

    if 't' in columns:
        # sort according to the timestamp column
        if data['t'].dtype == np.int64: # probably a timestamp
            data['t'] = pd.to_datetime(data['t'], unit='s')
        else:
            data['t'] = pd.to_datetime(data['t'])
        print('Sort data in chronological order...')
        data.sort_values('t', inplace=True)

    return data
move_to_hadoop.py 文件源码 项目:py-hadoop-tutorial 作者: hougs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def to_pd_dt(filename):
    return pd.to_datetime(filename, format='pageviews-%Y%m%d-%H0000')

# ## Hive Metastore
# Ibis allows us to interogate the hive metastore. We can determine if
# databases or tables exists by using functions defined directly on the ibis_
# connection.
#
# It is useful for us to determine if a database exists and then create it if
#  it does not.
outliers.py 文件源码 项目:py-hadoop-tutorial 作者: hougs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def row_to_tuple(row):
    timestamp = pd.to_datetime("{0}-{1}-{2} {3}:00:00".format(row.year,
                                                              row.month,
                                                              row.day,
                                                              row.hour))
    return (row.page_name, (timestamp, row.n_views))
asset_writer.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def dt_to_epoch_ns(dt_series):
        index = pd.to_datetime(dt_series.values)
        try:
            index = index.tz_localize('UTC')
        except TypeError:
            index = index.tz_convert('UTC')

        return index.view(np.int64)
core.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def to_series(knowledge_dates, earning_dates):
    """
    Helper for converting a dict of strings to a Series of datetimes.

    This is just for making the test cases more readable.
    """
    return pd.Series(
        index=pd.to_datetime(knowledge_dates),
        data=pd.to_datetime(earning_dates),
    )
minute_bars.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _calc_minute_index(market_opens, minutes_per_day):
    minutes = np.zeros(len(market_opens) * minutes_per_day,
                       dtype='datetime64[ns]')
    deltas = np.arange(0, minutes_per_day, dtype='timedelta64[m]')
    for i, market_open in enumerate(market_opens):
        start = market_open.asm8
        minute_values = start + deltas
        start_ix = minutes_per_day * i
        end_ix = start_ix + minutes_per_day
        minutes[start_ix:end_ix] = minute_values
    return pd.to_datetime(minutes, utc=True, box=True)


问题


面经


文章

微信
公众号

扫码关注公众号