python类date_range()的实例源码

process_hdf.py 文件源码 项目:pyrsss 作者: butala 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def fill_nans(df, delta=None):
    """
    """
    if not delta:
        dt_diff = NP.diff(df.index.values)
        delta_timedelta64 = min(dt_diff)
        delta_seconds = delta_timedelta64 / NP.timedelta64(1, 's')
        delta = timedelta(seconds=delta_seconds)
    logger.info('Using delta = {} (s)'.format(delta.total_seconds()))
    index_new = PD.date_range(start=df.index[0],
                              end=df.index[-1],
                              freq=delta)
    missing = sorted(set(index_new) - set(df.index))
    if missing:
        logger.warning('Missing time indices (filled by NaNs):')
        for x in missing:
            logger.warning(x)
    return df.reindex(index_new, copy=False), delta
test_sources.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_nan_filter_dataframe(self):
        dates = pd.date_range('1/1/2000', periods=2, freq='B', tz='UTC')
        df = pd.DataFrame(np.random.randn(2, 2),
                          index=dates,
                          columns=[4, 5])
        # should be filtered
        df.loc[dates[0], 4] = np.nan
        # should not be filtered, should have been ffilled
        df.loc[dates[1], 5] = np.nan
        source = DataFrameSource(df)
        event = next(source)
        self.assertEqual(5, event.sid)
        event = next(source)
        self.assertEqual(4, event.sid)
        event = next(source)
        self.assertEqual(5, event.sid)
        self.assertFalse(np.isnan(event.price))
test_sources.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_nan_filter_panel(self):
        dates = pd.date_range('1/1/2000', periods=2, freq='B', tz='UTC')
        df = pd.Panel(np.random.randn(2, 2, 2),
                      major_axis=dates,
                      items=[4, 5],
                      minor_axis=['price', 'volume'])
        # should be filtered
        df.loc[4, dates[0], 'price'] = np.nan
        # should not be filtered, should have been ffilled
        df.loc[5, dates[1], 'price'] = np.nan
        source = DataPanelSource(df)
        event = next(source)
        self.assertEqual(5, event.sid)
        event = next(source)
        self.assertEqual(4, event.sid)
        self.assertRaises(StopIteration, next, source)
future.py 文件源码 项目:slaveo 作者: lamter 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getCalendar(self):
        """
        ?????
        :return:
        """
        # ???????
        tradecalendar = pd.DataFrame(data=pd.date_range(self.begin, self.end), columns=['date'])

        # ??????????
        types, weekdays = self._weekend_trade_day_type(tradecalendar["date"])
        tradecalendar["type"] = types
        tradecalendar["weekday"] = weekdays
        tradecalendar["weekday"] += 1
        tradecalendar = tradecalendar.set_index("date", drop=False)

        # ?????????
        tradecalendar = self._holiday_trade_day_type(tradecalendar)

        # ??????
        tradecalendar = self._tradestatus(tradecalendar)

        return tradecalendar
make_features.py 文件源码 项目:tianchi_power 作者: lvniqi 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def make_features(user_id,user_df):
    """
    ??????
    """
    print 'user_id:', user_id
    power = user_df.power_consumption
    assert power.index[0] == user_df.index[0]
    assert len(user_df.index) == 639
    new_df = pd.DataFrame(index=user_df.index.union(pd.date_range('2016-9-1','2016-9-30')))
    pw_new = power.copy()
    #predict 30 days and 30days for features
    for d in range(60):
        pw_new.index += pd.Timedelta('1D')
        new_df['power#-%d'%(d+1)] = pw_new
    #create 30 models
    for d in range(30):
        #30 days features
        x_ = new_df[new_df.columns[d:30+d]]
        x_['y'] = power
        x_.to_csv('./features/day_model/%d/%d.csv'%(d+1,user_id))

    #return x_
make_features.py 文件源码 项目:tianchi_power 作者: lvniqi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def make_month_features(user_id,user_df):
    """
    ??????
    """
    print 'user_id:', user_id
    power = user_df.power_consumption.copy()
    assert power.index[0] == user_df.index[0]
    new_df = pd.DataFrame(index=user_df.index.union(pd.date_range('2016-10-1','2016-10-31')))
    pw_new = power.copy()
    #predict 30 days and 30days for features
    for d in range(30):
        pw_new.index += pd.Timedelta('1D')
        new_df['power#-%d'%(d+1)] = pw_new
    #create 30 models
    for d in range(31):
        #30 days features
        new_df['y#%d'%d] = power
        power.index -= pd.Timedelta('1D')
    save_month_df(new_df,user_id)
    return new_df
renpass_gis_main.py 文件源码 项目:renpass_gis 作者: znes 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_energysystem(nodes, **arguments):
    """Creates the energysystem.

    Parameters
    ----------
    nodes:
        A list of entities that comprise the energy system
    **arguments : key word arguments
        Arguments passed from command line
    """

    datetime_index = pd.date_range(arguments['--date-from'],
                                   arguments['--date-to'],
                                   freq='60min')

    es = EnergySystem(entities=nodes,
                      groupings=GROUPINGS,
                      timeindex=datetime_index)

    return es
test_data.py 文件源码 项目:psyplot 作者: Chilipp 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _from_dataset_test_variables(self):
        """The variables and coords needed for the from_dataset tests"""
        variables = {
             # 3d-variable
             'v0': xr.Variable(('time', 'ydim', 'xdim'), np.zeros((4, 4, 4))),
             # 2d-variable with time and x
             'v1': xr.Variable(('time', 'xdim', ), np.zeros((4, 4))),
             # 2d-variable with y and x
             'v2': xr.Variable(('ydim', 'xdim', ), np.zeros((4, 4))),
             # 1d-variable
             'v3': xr.Variable(('xdim', ), np.zeros(4))}
        coords = {
            'ydim': xr.Variable(('ydim', ), np.arange(1, 5)),
            'xdim': xr.Variable(('xdim', ), np.arange(4)),
            'time': xr.Variable(
                ('time', ),
                pd.date_range('1999-01-01', '1999-05-01', freq='M').values)}
        return variables, coords
map_camps_timehistory.py 文件源码 项目:Visualflee 作者: cspgdds 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def make_features(locations_file='blocations.csv',
                  timeseries_file='burundioutput.csv',
                  startdate='2015-05-01'):
    locations = pd.read_csv(locations_file)
    timeseries = pd.read_csv(timeseries_file)
    n_days = timeseries.shape[0]
    # Construct an index with real dates rather than day numbers
    timeseries.index = pd.date_range(startdate, periods=n_days)

    features = []
    for location in locations.itertuples(name='Location'):
        latlon = (location.latitude, location.longitude)
        loctype_by_day = get_loctype(location, timeseries.index)
        population_by_day = get_population(timeseries, location.name)

        data_for_location = pd.DataFrame({'loctype': loctype_by_day,
                                          'population': population_by_day})
        feature = mgj.make_gj_points(latlon, location.name, data_for_location)
        features.extend(feature)
    return features
test_make_geojson.py 文件源码 项目:Visualflee 作者: cspgdds 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_make_gj_points():
    index = pandas.date_range('2015-3-1', periods=100)
    popn = pandas.Series([n * 500 for n in range(100)], index=index)
    loctype = pandas.Series((['city'] * 50) + (['conflict'] * 50), index=index)
    timeseries = pandas.DataFrame({'loctype': loctype, 'population': popn})

    res = make_geojson.make_gj_points((52.0, 0.0), 'Examplecamp', timeseries)

    assert len(res) == 100
    assert res[0]['type'] == 'Feature'
    assert res[0]['properties']['start'] == '2015-03-01'
    assert res[0]['properties']['end'] == '2015-03-02'
    assert res[0]['properties']['loctype'] == 'city'
    assert res[0]['geometry']['coordinates'] == (0.0, 52.0)

    assert res[50]['properties']['loctype'] == 'conflict'
    assert res[50]['properties']['start'] == '2015-04-20'
Weather.py 文件源码 项目:astk 作者: openalea-incubator 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def date_range_index(self, start, end=None, by=24):
        """ return a (list of) time sequence that allow indexing one or several time intervals between start and end every 'by' hours
        if end is None, only one time interval of 'by' hours is returned

        start and end are expected in local time
        """
        if end is None:
            seq = pandas.date_range(start=start, periods=by, freq='H',
                                    tz=self.timezone.zone)
            return seq.tz_convert('UTC')
        else:
            seq = pandas.date_range(start=start, end=end, freq='H',
                                    tz=self.timezone.zone)
            seq = seq.tz_convert('UTC')
            bins = pandas.date_range(start=start, end=end, freq=str(by) + 'H',
                                     tz=self.timezone.zone)
            bins = bins.tz_convert('UTC')
            return [seq[(seq >= bins[i]) & (seq < bins[i + 1])] for i in
                    range(len(bins) - 1)]
bdew.py 文件源码 项目:demandlib 作者: oemof 项目源码 文件源码 阅读 437 收藏 0 点赞 0 评论 0
def __init__(self, year, seasons=None, holidays=None):
        if calendar.isleap(year):
            hoy = 8784
        else:
            hoy = 8760
        self.datapath = os.path.join(os.path.dirname(__file__), 'bdew_data')
        self.date_time_index = pd.date_range(
            pd.datetime(year, 1, 1, 0), periods=hoy * 4, freq='15Min')
        if seasons is None:
            self.seasons = {
                'summer1': [5, 15, 9, 14],  # summer: 15.05. to 14.09
                'transition1': [3, 21, 5, 14],  # transition1 :21.03. to 14.05
                'transition2': [9, 15, 10, 31],  # transition2 :15.09. to 31.10
                'winter1': [1, 1, 3, 20],  # winter1:  01.01. to 20.03
                'winter2': [11, 1, 12, 31],  # winter2: 01.11. to 31.12
            }
        else:
            self.seasons = seasons
        self.year = year
        self.slp_frame = self.all_load_profiles(self.date_time_index,
                                                holidays=holidays)
pandas_book.py 文件源码 项目:base_function 作者: Rockyzsu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def date_op():
    start = pd.date_range('2015-01-01', periods=50)
    #print start
    print type(start)

    date_list = [datetime.datetime(2017, 1, 1), datetime.datetime(2017, 1, 2), datetime.datetime(2017, 1, 3),
                 datetime.datetime(2017, 1, 4)]
    df = pd.DataFrame(np.random.randn(4), index=date_list)
    print df
    print df.index[2]
    format_line()

    s_x = pd.date_range('2000-1-1', periods=1000)
    df_x = pd.DataFrame(np.arange(2000).reshape(1000, 2), index=s_x)
    print df_x
    print df_x.ix['2002/09/24']
    print df_x[1]
    #?????????
    #?????ix
    print df_x.ix['2001-09']
convert_to_timeseries.py 文件源码 项目:Python-Machine-Learning-Cookbook 作者: PacktPublishing 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def convert_data_to_timeseries(input_file, column, verbose=False):
    # Load the input file
    data = np.loadtxt(input_file, delimiter=',')

    # Extract the start and end dates
    start_date = str(int(data[0,0])) + '-' + str(int(data[0,1]))
    end_date = str(int(data[-1,0] + 1)) + '-' + str(int(data[-1,1] % 12 + 1))

    if verbose:
        print "\nStart date =", start_date
        print "End date =", end_date

    # Create a date sequence with monthly intervals
    dates = pd.date_range(start_date, end_date, freq='M')

    # Convert the data into time series data
    data_timeseries = pd.Series(data[:,column], index=dates)

    if verbose:
        print "\nTime series data:\n", data_timeseries[:10]

    return data_timeseries
clients.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_gsod_data(self, station, year):

        filename_format = '/pub/data/gsod/{year}/{station}-{year}.op.gz'
        lines = self._retreive_file_lines(filename_format, station, year)

        dates = pd.date_range("{}-01-01 00:00".format(year),
                              "{}-12-31 00:00".format(year),
                              freq='D', tz=pytz.UTC)
        series = pd.Series(None, index=dates, dtype=float)

        for line in lines[1:]:
            columns = line.split()
            date_str = columns[2].decode('utf-8')
            temp_F = float(columns[3])
            temp_C = (5. / 9.) * (temp_F - 32.)
            dt = pytz.UTC.localize(datetime.strptime(date_str, "%Y%m%d"))
            series[dt] = temp_C

        return series
clients.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_isd_data(self, station, year):

        filename_format = '/pub/data/noaa/{year}/{station}-{year}.gz'
        lines = self._retreive_file_lines(filename_format, station, year)

        dates = pd.date_range("{}-01-01 00:00".format(year),
                              "{}-12-31 23:00".format(int(year) + 1),
                              freq='H', tz=pytz.UTC)
        series = pd.Series(None, index=dates, dtype=float)

        for line in lines:
            if line[87:92].decode('utf-8') == "+9999":
                temp_C = float("nan")
            else:
                temp_C = float(line[87:92]) / 10.
            date_str = line[15:27].decode('utf-8')

            # there can be multiple readings per hour, so set all to minute 0
            dt = pytz.UTC.localize(datetime.strptime(date_str, "%Y%m%d%H%M")).replace(minute=0)

            # only set the temp if it's the first encountered in the hour.
            if pd.isnull(series.ix[dt]):
                series[dt] = temp_C

        return series
test_arbitrary_start_serializer.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_to_records(serializer):

    data = {"value": [1, np.nan], "estimated": [True, False]}
    columns = ["value", "estimated"]
    index = pd.date_range('2000-01-01', periods=2, freq='D')
    df = pd.DataFrame(data, index=index, columns=columns)

    records = serializer.to_records(df)
    assert len(records) == 2
    assert records[0]["start"] == datetime(2000, 1, 1, tzinfo=pytz.UTC)
    assert records[0]["value"] == 1
    assert records[0]["estimated"]

    assert records[1]["start"] == datetime(2000, 1, 2, tzinfo=pytz.UTC)
    assert pd.isnull(records[1]["value"])
    assert not records[1]["estimated"]
test_arbitrary_end_serializer.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_to_records(serializer):

    data = {"value": [1, np.nan], "estimated": [True, False]}
    columns = ["value", "estimated"]
    index = pd.date_range('2000-01-01', periods=2, freq='D')
    df = pd.DataFrame(data, index=index, columns=columns)

    records = serializer.to_records(df)
    assert len(records) == 2
    assert records[0]["end"] == datetime(2000, 1, 1, tzinfo=pytz.UTC)
    assert pd.isnull(records[0]["value"])
    assert not records[0]["estimated"]

    assert records[1]["end"] == datetime(2000, 1, 2, tzinfo=pytz.UTC)
    assert records[1]["value"] == 1
    assert records[1]["estimated"]
test_energy_efficiency_meter.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def meter_input_daily(project_meter_input):

    record_starts = pd.date_range(
        '2012-01-01', periods=365 * 4, freq='D', tz=pytz.UTC)

    records = [
        {
            "start": dt.isoformat(),
            "value": 1.0,
            "estimated": False
        } for dt in record_starts
    ]

    trace = _natural_gas_input(records)
    trace.update({'interval': 'daily'})

    meter_input = {
        "type": "SINGLE_TRACE_SIMPLE_PROJECT",
        "trace": trace,
        "project": project_meter_input,
    }
    return meter_input
test_energy_efficiency_meter.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def meter_input_hourly(project_meter_input):

    record_starts = pd.date_range(
        '2012-01-01', periods=365 * 4 * 24, freq='H', tz=pytz.UTC)

    records = [
        {
            "start": dt.isoformat(),
            "value": 1.0 + dt.hour,
            "estimated": False
        } for dt in record_starts
    ]

    trace = _natural_gas_input(records)
    trace.update({'interval': 'hourly'})

    meter_input = {
        "type": "SINGLE_TRACE_SIMPLE_PROJECT",
        "trace": trace,
        "project": project_meter_input,
    }
    return meter_input
test_energy_efficiency_meter.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def meter_input_daily_baseline_only(project_meter_input):

    record_starts = pd.date_range(
        '2012-01-01', periods=365 * 1, freq='D', tz=pytz.UTC)

    records = [
        {
            "start": dt.isoformat(),
            "value": 1.0,
            "estimated": False
        } for dt in record_starts
    ]

    meter_input = {
        "type": "SINGLE_TRACE_SIMPLE_PROJECT",
        "trace": _natural_gas_input(records),
        "project": project_meter_input,
    }
    return meter_input
test_energy_efficiency_meter.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 59 收藏 0 点赞 0 评论 0
def meter_input_daily_reporting_only(project_meter_input):

    record_starts = pd.date_range(
        '2014-02-01', periods=365 * 1, freq='D', tz=pytz.UTC)

    records = [
        {
            "start": dt.isoformat(),
            "value": 1.0,
            "estimated": False
        } for dt in record_starts
    ]

    meter_input = {
        "type": "SINGLE_TRACE_SIMPLE_PROJECT",
        "trace": _natural_gas_input(records),
        "project": project_meter_input,
    }
    return meter_input
test_energy_efficiency_meter.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def meter_input_daily_with_period_start_end(
        project_meter_input_with_period_start_end):

    record_starts = pd.date_range(
        '2012-01-01', periods=365 * 4, freq='D', tz=pytz.UTC)

    records = [
        {
            "start": dt.isoformat(),
            "value": 1.0,
            "estimated": False
        } for dt in record_starts
    ]

    trace = _natural_gas_input(records)
    trace.update({'interval': 'daily'})

    meter_input = {
        "type": "SINGLE_TRACE_SIMPLE_PROJECT",
        "trace": trace,
        "project": project_meter_input_with_period_start_end,
    }
    return meter_input
test_energy_efficiency_meter.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def meter_input_strange_interpretation(project_meter_input):

    record_starts = pd.date_range(
        '2012-01-01', periods=365 * 4, freq='D', tz=pytz.UTC)

    records = [
        {
            "start": dt.isoformat(),
            "value": 1.0,
            "estimated": False
        } for dt in record_starts
    ]

    meter_input = {
        "type": "SINGLE_TRACE_SIMPLE_PROJECT",
        "trace": {
            "type": "ARBITRARY_START",
            "interpretation": "ELECTRICITY_CONSUMPTION_NET",
            "unit": "therm",
            "records": records
        },
        "project": project_meter_input
    }
    return meter_input
test_model_data_billing_formatter.py 文件源码 项目:eemeter 作者: openeemeter 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def trace4():

    trace_length = 100
    data = {
        "value": [1 for _ in range(trace_length)],
        "estimated": [False for _ in range(trace_length)]
    }
    columns = ["value", "estimated"]
    index = pd.date_range(
        start=datetime(2011, 1, 1, tzinfo=pytz.UTC),
        periods=trace_length,
        freq='D',
        tz=pytz.UTC
    )
    df = pd.DataFrame(data, index=index, columns=columns)
    return EnergyTrace("ELECTRICITY_CONSUMPTION_SUPPLIED", df, unit="KWH")
monitor.py 文件源码 项目:ModelFlow 作者: yuezPrincetechs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def parse_raw(filepath,seconds=1):
    '''
    ??filepath?????????????
    :param filepath: ???????????????
    :param seconds: int??????????????
    :return: dataframe??index??????columns??????
    '''
    data_head=pd.read_csv(filepath,delim_whitespace=True,header=None,nrows=1)
    data=pd.read_csv(filepath,delim_whitespace=True,header=None,skiprows=2)
    date_start=data_head.iloc[0,3]
    time_start=data.iloc[1,0]+' '+data.iloc[1,1]
    datetime_start=pd.to_datetime(date_start+' '+time_start)
    columns=list(data.iloc[0,2:])
    newdata=data.iloc[1:,2:].applymap(convert2float)
    newdata=newdata.dropna(axis=0,how='any')
    newdata=newdata.loc[(newdata.applymap(type)==type('')).sum(axis=1)<newdata.shape[1]]
    newdata=newdata.applymap(convert2float)
    newdata.columns=columns
    newdata.index=pd.date_range(start=datetime_start,periods=newdata.shape[0],freq='%dS'%seconds)
    newdata.index.name='datetime'
    return newdata
test_utils.py 文件源码 项目:pandas_market_calendars 作者: rsheftel 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_date_range_lower_freq():
    cal = mcal.get_calendar("NYSE")
    schedule = cal.schedule(pd.Timestamp('2017-09-05 20:00', tz='UTC'), pd.Timestamp('2017-10-23 20:00', tz='UTC'))

    # cannot get date range of frequency lower than 1D
    with pytest.raises(ValueError):
        mcal.date_range(schedule, frequency='3D')

    # instead get for 1D and convert to lower frequency
    short = mcal.date_range(schedule, frequency='1D')
    actual = mcal.convert_freq(short, '3D')
    expected = pd.date_range('2017-09-05 20:00', '2017-10-23 20:00', freq='3D', tz='UTC')
    assert_index_equal(actual, expected)

    actual = mcal.convert_freq(short, '1W')
    expected = pd.date_range('2017-09-05 20:00', '2017-10-23 20:00', freq='1W', tz='UTC')
    assert_index_equal(actual, expected)
bundle_utils.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_periods_range(start_dt, end_dt, freq):
    """
    Get a date range for the specified parameters.

    Parameters
    ----------
    start_dt: datetime
    end_dt: datetime
    freq: str

    Returns
    -------
    DateTimeIndex

    """
    if freq == 'minute':
        freq = 'T'

    elif freq == 'daily':
        freq = 'D'

    return pd.date_range(start_dt, end_dt, freq=freq)
test_continuous_futures.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_contract_at_offset(self):
        contract_sids = array([1, 2, 3, 4], dtype=int64)
        start_dates = pd.date_range('2015-01-01', periods=4, tz="UTC")

        contracts = deque(self.asset_finder.retrieve_all(contract_sids))

        oc = OrderedContracts('FO', contracts)

        self.assertEquals(1,
                          oc.contract_at_offset(1, 0, start_dates[-1].value),
                          "Offset of 0 should return provided sid")

        self.assertEquals(2,
                          oc.contract_at_offset(1, 1, start_dates[-1].value),
                          "Offset of 1 should return next sid in chain.")

        self.assertEquals(None,
                          oc.contract_at_offset(4, 1, start_dates[-1].value),
                          "Offset at end of chain should not crash.")
test_events.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_next_event_indexer(self):
        events = self.events
        event_sids = events['sid'].values
        event_dates = events['event_date'].values
        event_timestamps = events['timestamp'].values

        all_dates = pd.date_range('2014', '2014-01-31')
        all_sids = np.unique(event_sids)

        indexer = next_event_indexer(
            all_dates,
            all_sids,
            event_dates,
            event_timestamps,
            event_sids,
        )

        # Compute expected results without knowledge of null events.
        for i, sid in enumerate(all_sids):
            self.check_next_event_indexer(
                events,
                all_dates,
                sid,
                indexer[:, i],
            )


问题


面经


文章

微信
公众号

扫码关注公众号