python类DatetimeIndex()的实例源码

trading.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def market_minute_window(self, start, count, step=1):
        """
        Return a DatetimeIndex containing `count` market minutes, starting with
        `start` and continuing `step` minutes at a time.
        """
        if not self.is_market_hours(start):
            raise ValueError("market_minute_window starting at "
                             "non-market time {minute}".format(minute=start))

        all_minutes = []

        current_day_minutes = self.market_minutes_for_day(start)
        first_minute_idx = current_day_minutes.searchsorted(start)
        minutes_in_range = current_day_minutes[first_minute_idx::step]

        # Build up list of lists of days' market minutes until we have count
        # minutes stored altogether.
        while True:

            if len(minutes_in_range) >= count:
                # Truncate off extra minutes
                minutes_in_range = minutes_in_range[:count]

            all_minutes.append(minutes_in_range)
            count -= len(minutes_in_range)
            if count <= 0:
                break

            if step > 0:
                start, _ = self.next_open_and_close(start)
                current_day_minutes = self.market_minutes_for_day(start)
            else:
                _, start = self.previous_open_and_close(start)
                current_day_minutes = self.market_minutes_for_day(start)

            minutes_in_range = current_day_minutes[::step]

        # Concatenate all the accumulated minutes.
        return pd.DatetimeIndex(
            np.concatenate(all_minutes), copy=False, tz='UTC',
        )
tradingcalendar_tse.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_early_closes(start, end):
    # TSX closed at 1:00 PM on december 24th.

    start = canonicalize_datetime(start)
    end = canonicalize_datetime(end)

    start = max(start, datetime(1993, 1, 1, tzinfo=pytz.utc))
    end = max(end, datetime(1993, 1, 1, tzinfo=pytz.utc))

    # Not included here are early closes prior to 1993
    # or unplanned early closes

    early_close_rules = []

    christmas_eve = rrule.rrule(
        rrule.MONTHLY,
        bymonth=12,
        bymonthday=24,
        byweekday=(rrule.MO, rrule.TU, rrule.WE, rrule.TH, rrule.FR),
        cache=True,
        dtstart=start,
        until=end
    )
    early_close_rules.append(christmas_eve)

    early_close_ruleset = rrule.rruleset()

    for rule in early_close_rules:
        early_close_ruleset.rrule(rule)
    early_closes = early_close_ruleset.between(start, end, inc=True)

    early_closes.sort()
    return pd.DatetimeIndex(early_closes)
data.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def current_dates(self):
        where = slice(self._start_index, self._pos)
        return pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
data.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_current(self):
        """
        Get a Panel that is the current data in view. It is not safe to persist
        these objects because internal data might change
        """

        where = slice(self._oldest_frame_idx(), self._pos)
        major_axis = pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
        return pd.Panel(self.buffer.values[:, where, :], self.items,
                        major_axis, self.minor_axis, dtype=self.dtype)
data.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def current_dates(self):
        where = slice(self._oldest_frame_idx(), self._pos)
        return pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
fixtures.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def pipeline_event_loader_args(self, dates):
        """Construct the base  object to pass to the loader.

        Parameters
        ----------
        dates : pd.DatetimeIndex
            The dates we can serve.

        Returns
        -------
        args : tuple[any]
            The arguments to forward to the loader positionally.
        """
        return dates, self.get_dataset()
core.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def temp_pipeline_engine(calendar, sids, random_seed, symbols=None):
    """
    A contextManager that yields a SimplePipelineEngine holding a reference to
    an AssetFinder generated via tmp_asset_finder.

    Parameters
    ----------
    calendar : pd.DatetimeIndex
        Calendar to pass to the constructed PipelineEngine.
    sids : iterable[int]
        Sids to use for the temp asset finder.
    random_seed : int
        Integer used to seed instances of SeededRandomLoader.
    symbols : iterable[str], optional
        Symbols for constructed assets. Forwarded to make_simple_equity_info.
    """
    equity_info = make_simple_equity_info(
        sids=sids,
        start_date=calendar[0],
        end_date=calendar[-1],
        symbols=symbols,
    )

    loader = make_seeded_random_loader(random_seed, calendar, sids)
    get_loader = lambda column: loader

    with tmp_asset_finder(equities=equity_info) as finder:
        yield SimplePipelineEngine(get_loader, calendar, finder)
loader.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def has_data_for_dates(series_or_df, first_date, last_date):
    """
    Does `series_or_df` have data on or before first_date and on or after
    last_date?
    """
    dts = series_or_df.index
    if not isinstance(dts, pd.DatetimeIndex):
        raise TypeError("Expected a DatetimeIndex, but got %s." % type(dts))
    first, last = dts[[0, -1]]
    return (first <= first_date) and (last >= last_date)
loader.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load_prices_from_csv(filepath, identifier_col, tz='UTC'):
    data = pd.read_csv(filepath, index_col=identifier_col)
    data.index = pd.DatetimeIndex(data.index, tz=tz)
    data.sort_index(inplace=True)
    return data
minute_bars.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self,
                 first_trading_day,
                 minute_index,
                 market_opens,
                 market_closes,
                 ohlc_ratio):
        """
        Parameters:
        -----------
        first_trading_day : datetime-like
            UTC midnight of the first day available in the dataset.
        minute_index : pd.DatetimeIndex
            The minutes which act as an index into the corresponding values
            written into each sid's ctable.
        market_opens : pd.DatetimeIndex
            The market opens for each day in the data set. (Not yet required.)
        market_closes : pd.DatetimeIndex
            The market closes for each day in the data set. (Not yet required.)
        ohlc_ratio : int
             The factor by which the pricing data is multiplied so that the
             float data can be stored as an integer.
        """
        self.first_trading_day = first_trading_day
        self.minute_index = minute_index
        self.market_opens = market_opens
        self.market_closes = market_closes
        self.ohlc_ratio = ohlc_ratio
core.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def overwrite_novel_deltas(baseline, deltas, dates):
    """overwrite any deltas into the baseline set that would have changed our
    most recently known value.

    Parameters
    ----------
    baseline : pd.DataFrame
        The first known values.
    deltas : pd.DataFrame
        Overwrites to the baseline data.
    dates : pd.DatetimeIndex
        The dates requested by the loader.

    Returns
    -------
    non_novel_deltas : pd.DataFrame
        The deltas that do not represent a baseline value.
    """
    get_indexes = dates.searchsorted
    novel_idx = (
        get_indexes(deltas[TS_FIELD_NAME].values, 'right') -
        get_indexes(deltas[AD_FIELD_NAME].values, 'left')
    ) <= 1
    novel_deltas = deltas.loc[novel_idx]
    non_novel_deltas = deltas.loc[~novel_idx]
    return sort_values(pd.concat(
        (baseline, novel_deltas),
        ignore_index=True,
    ), TS_FIELD_NAME), non_novel_deltas
core.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def adjustments_from_deltas_no_sids(dense_dates,
                                    sparse_dates,
                                    column_idx,
                                    column_name,
                                    asset_idx,
                                    deltas):
    """Collect all the adjustments that occur in a dataset that does not
    have a sid column.

    Parameters
    ----------
    dense_dates : pd.DatetimeIndex
        The dates requested by the loader.
    sparse_dates : pd.DatetimeIndex
        The dates that were in the raw data.
    column_idx : int
        The index of the column in the dataset.
    column_name : str
        The name of the column to compute deltas for.
    asset_idx : pd.Series[int -> int]
        The mapping of sids to their index in the output.
    deltas : pd.DataFrame
        The overwrites that should be applied to the dataset.

    Returns
    -------
    adjustments : dict[idx -> Float64Overwrite]
        The adjustments dictionary to feed to the adjusted array.
    """
    ad_series = deltas[AD_FIELD_NAME]
    idx = 0, len(asset_idx) - 1
    return {
        dense_dates.get_loc(kd): overwrite_from_dates(
            ad_series.loc[kd],
            dense_dates,
            sparse_dates,
            idx,
            v,
        ) for kd, v in deltas[column_name].iteritems()
    }
test_blaze.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_custom_query_time_tz(self):
        df = self.df.copy()
        df['timestamp'] = (
            pd.DatetimeIndex(df['timestamp'], tz='EST') +
            timedelta(hours=8, minutes=44)
        ).tz_convert('utc').tz_localize(None)
        df.ix[3:5, 'timestamp'] = pd.Timestamp('2014-01-01 13:45')
        expr = bz.data(df, name='expr', dshape=self.dshape)
        loader = BlazeLoader(data_query_time=time(8, 45), data_query_tz='EST')
        ds = from_blaze(
            expr,
            loader=loader,
            no_deltas_rule=no_deltas_rules.ignore,
            missing_values=self.missing_values,
        )
        p = Pipeline()
        p.add(ds.value.latest, 'value')
        p.add(ds.int_value.latest, 'int_value')
        dates = self.dates

        with tmp_asset_finder() as finder:
            result = SimplePipelineEngine(
                loader,
                dates,
                finder,
            ).run_pipeline(p, dates[0], dates[-1])

        expected = df.drop('asof_date', axis=1)
        expected['timestamp'] = expected['timestamp'].dt.normalize().astype(
            'datetime64[ns]',
        ).dt.tz_localize('utc')
        expected.ix[3:5, 'timestamp'] += timedelta(days=1)
        expected.set_index(['timestamp', 'sid'], inplace=True)
        expected.index = pd.MultiIndex.from_product((
            expected.index.levels[0],
            finder.retrieve_all(expected.index.levels[1]),
        ))
        assert_frame_equal(result, expected, check_dtype=False)
test_tradesimulation.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_before_trading_start(self, test_name, num_days, freq,
                                  emission_rate):
        params = factory.create_simulation_parameters(
            num_days=num_days, data_frequency=freq,
            emission_rate=emission_rate)

        algo = BeforeTradingAlgorithm(sim_params=params)
        algo.run(source=[], overwrite_sim_params=False)

        self.assertEqual(algo.perf_tracker.day_count, num_days)
        self.assertTrue(params.trading_days.equals(
            pd.DatetimeIndex(algo.before_trading_at)),
            "Expected %s but was %s."
            % (params.trading_days, algo.before_trading_at))
test_marketdata.py 文件源码 项目:ibstract 作者: jesseliu0 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_insert_hist_data(self):
        self._clear_db()
        init_db(self.db_info)

        # Insert two time-overlapped MarketDataBlocks
        async def run(loop, data):
            engine = await aiosa.create_engine(
                user=self.db_info['user'], db=self.db_info['db'],
                host=self.db_info['host'], password=self.db_info['password'],
                loop=loop, echo=False)
            await insert_hist_data(engine, 'Stock', data[0])
            await insert_hist_data(engine, 'Stock', data[1])
            engine.close()
            await engine.wait_closed()

        # Execute insertion
        blk0 = MarketDataBlock(testdata_insert_hist_data[0])
        blk1 = MarketDataBlock(testdata_insert_hist_data[1])
        data = [blk0, blk1]
        loop = asyncio.get_event_loop()
        loop.run_until_complete(run(loop, data))

        # Verify insertion
        df_source = testdata_insert_hist_data[2]
        engine = create_engine(self.db_conn)
        conn = engine.connect()
        metadata = MetaData(engine, reflect=True)
        table = metadata.tables['Stock']
        result = conn.execute(select([table]))
        # self.assertEqual(result.keys(), list(df_source.columns))
        df = pd.DataFrame(result.fetchall())
        df.columns = result.keys()
        _logger.debug(df.TickerTime[0])
        df.TickerTime = pd.DatetimeIndex(df.TickerTime).tz_localize('UTC')
        df_source.TickerTime = df_source.TickerTime.apply(pd.Timestamp)
        _logger.debug(df.iloc[0])
        assert_frame_equal(df, df_source)
aggregation_test.py 文件源码 项目:rdtools 作者: NREL 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        ind = pd.DatetimeIndex(freq='12h', start='2015-01-01', end='2015-01-02 23:59')

        self.insol = pd.Series(data=[500, 1000, 500, 1000], index=ind)
        self.energy = pd.Series(data=[1.0, 4, 1.0, 4], index=ind)

        self.aggregated = aggregation_insol(self.energy, self.insol, frequency='D')

    # Test for the expected energy waited result
sequence_gen.py 文件源码 项目:johnson-county-ddj-public 作者: dssg 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_series(user,freq='1M'):
    user_df = user.sort_values(by='begin_date')
    user_series = pd.Series(list(user_df['event']), index=user_df['begin_date'])
    user_series.index = pd.DatetimeIndex(user_series.index)
    user_resample = user_series.resample(freq).sum().astype('str').replace(r'0', "", regex=True)
    #time = pd.date_range(start='2010-01',end='2016-06',freq=freq)
    #user_resample = user_resample.reindex(time,fill_value="").astype('str')
    #for i in range(len(user_resample)):
    #        user_resample[i] = "".join(user_resample[i].split('0'))
    return user_resample
user_timeline.py 文件源码 项目:johnson-county-ddj-public 作者: dssg 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_series(self,freq='1M'):
        user_df = self.user.sort_values(by='begin_date')
        user_series = pd.Series(list(user_df['event']), index=user_df['begin_date'])
        user_series.index = pd.DatetimeIndex(user_series.index)
        user_resample = user_series.resample('1D').sum()
        time = pd.date_range(start='2010-01',end='2016-06',freq='1D')
        user_resample = user_resample.reindex(time,fill_value=0)
        user_resample = user_resample.astype('str')
        user_resample = user_resample.resample(freq).sum()
        for i in range(len(user_resample)):
            user_resample[i] = "".join(user_resample[i].split('0'))
        return user_resample
data_loading.py 文件源码 项目:copper_price_forecast 作者: liyinwei 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def read_co_data_rnn():
    """
    ?????????????
    """
    print("start loading data...")
    sql = const.CO_PRICE_SQL_RNN
    df = read_data_from_mysql(sql)
    # tuples = list(zip(*[range(len(df)), df.price_date]))
    # # ??????
    # index = pd.MultiIndex.from_tuples(tuples, names=['id', 'date'])
    # df.index = index
    df.index = pd.DatetimeIndex(df.price_date)
    print("loading data finished.")
    return df
data_loading.py 文件源码 项目:copper_price_forecast 作者: liyinwei 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def read_co_price():
    """
    ???????????
    """
    print("start loading data...")
    sql = const.CO_PRICE_SQL
    df = read_data_from_mysql(sql)
    df.index = pd.DatetimeIndex(df.price_date)
    print("loading data finished.")
    return df.drop(['price_date'], axis=1)


问题


面经


文章

微信
公众号

扫码关注公众号