python类datetime()的实例源码

test_sources.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_yahoo_bars_to_panel_source(self):
        env = TradingEnvironment()
        finder = AssetFinder(env.engine)
        stocks = ['AAPL', 'GE']
        env.write_data(equities_identifiers=stocks)
        start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)
        end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)
        data = factory.load_bars_from_yahoo(stocks=stocks,
                                            indexes={},
                                            start=start,
                                            end=end)
        check_fields = ['sid', 'open', 'high', 'low', 'close',
                        'volume', 'price']

        copy_panel = data.copy()
        sids = finder.map_identifier_index_to_sids(
            data.items, data.major_axis[0]
        )
        copy_panel.items = sids
        source = DataPanelSource(copy_panel)
        for event in source:
            for check_field in check_fields:
                self.assertIn(check_field, event)
            self.assertTrue(isinstance(event['volume'], (integer_types)))
            self.assertTrue(event['sid'] in sids)
test_factory.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_load_bars_from_yahoo(self):
        stocks = ['AAPL', 'GE']
        start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)
        end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)
        data = load_bars_from_yahoo(stocks=stocks, start=start, end=end)

        assert data.major_axis[0] == pd.Timestamp('1993-01-04 00:00:00+0000')
        assert data.major_axis[-1] == pd.Timestamp('2001-12-31 00:00:00+0000')
        for stock in stocks:
            assert stock in data.items

        for ohlc in ['open', 'high', 'low', 'close', 'volume', 'price']:
            assert ohlc in data.minor_axis

        np.testing.assert_raises(
            AssertionError, load_bars_from_yahoo, stocks=stocks,
            start=end, end=start
        )
bdew.py 文件源码 项目:demandlib 作者: oemof 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, year, seasons=None, holidays=None):
        if calendar.isleap(year):
            hoy = 8784
        else:
            hoy = 8760
        self.datapath = os.path.join(os.path.dirname(__file__), 'bdew_data')
        self.date_time_index = pd.date_range(
            pd.datetime(year, 1, 1, 0), periods=hoy * 4, freq='15Min')
        if seasons is None:
            self.seasons = {
                'summer1': [5, 15, 9, 14],  # summer: 15.05. to 14.09
                'transition1': [3, 21, 5, 14],  # transition1 :21.03. to 14.05
                'transition2': [9, 15, 10, 31],  # transition2 :15.09. to 31.10
                'winter1': [1, 1, 3, 20],  # winter1:  01.01. to 20.03
                'winter2': [11, 1, 12, 31],  # winter2: 01.11. to 31.12
            }
        else:
            self.seasons = seasons
        self.year = year
        self.slp_frame = self.all_load_profiles(self.date_time_index,
                                                holidays=holidays)
agent.py 文件源码 项目:pyktrader2 作者: harveywwu 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def run_tick(self, event):#???????
        tick = event.dict['data']
        if self.live_trading:
            now_ticknum = get_tick_num(datetime.datetime.now())
            cur_ticknum = get_tick_num(tick.timestamp)
            if abs(cur_ticknum - now_ticknum)> self.realtime_tick_diff:
                self.logger.warning('the tick timestamp has more than 10sec diff from the system time, inst=%s, ticknum= %s, now_ticknum=%s' % (tick.instID, cur_ticknum, now_ticknum))
        if not self.update_instrument(tick):
            return
        inst = tick.instID
        if inst in self.inst2spread:
            for key in self.inst2spread[inst]:
                self.trade_manager.check_pending_trades(key)
        self.trade_manager.check_pending_trades(inst)            
        self.update_min_bar(tick)
        if inst in self.inst2spread:
            for key in self.inst2spread[inst]:
                self.trade_manager.process_trades(key)
        self.trade_manager.process_trades(inst)
        gway = self.inst2gateway[inst]
        if gway.process_flag:
            gway.send_queued_orders()
test_feature_availability_profiler.py 文件源码 项目:healthcareai-py 作者: HealthCatalyst 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setUp(self):
        self.df = pd.DataFrame(np.random.randn(1000, 4),
                               columns=['A', 'B', 'AdmitDTS', 'LastLoadDTS'])
        # generate load date
        self.df['LastLoadDTS'] = pd.datetime(2015, 5, 20)
        # generate datetime objects for admit date
        admit = pd.Series(1000)
        delta = pd.datetime(2015, 5, 20) - pd.datetime(2015, 5, 1)
        int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
        for i in range(1000):
            random_second = randrange(int_delta)
            admit[i] = pd.datetime(2015, 5, 1) + timedelta(seconds=random_second)
        self.df['AdmitDTS'] = admit
        # add nulls
        a = np.random.rand(1000) > .5
        self.df.loc[a, ['A']] = np.nan
        a = np.random.rand(1000) > .75
        self.df.loc[a, ['B']] = np.nan
test_io.py 文件源码 项目:pecos 作者: sandialabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_write_metrics1():
    filename = abspath(join(testdir, 'test_write_metrics1.csv'))
    if isfile(filename):
        os.remove(filename)

    metrics = pd.DataFrame({'metric1' : pd.Series([1.], index=[pd.datetime(2016,1,1)])})
    pecos.io.write_metrics(filename, metrics)
    assert_true(isfile(filename))

    from_file1 = pd.read_csv(filename)
    assert_equals(from_file1.shape, (1,2))

    # append another date
    metrics = pd.DataFrame({'metric1' : pd.Series([2.], index=[pd.datetime(2016,1,2)])})
    pecos.io.write_metrics(filename, metrics)

    from_file2 = pd.read_csv(filename)
    assert_equals(from_file2.shape, (2,2))

    # append another metric
    metrics = pd.DataFrame({'metric2' : pd.Series([3.], index=[pd.datetime(2016,1,2)])})
    pecos.io.write_metrics(filename, metrics)

    from_file3= pd.read_csv(filename)
    assert_equals(from_file3.shape, (2,3))
forecaster.py 文件源码 项目:prophet 作者: facebook 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fourier_series(dates, period, series_order):
        """Provides Fourier series components with the specified frequency
        and order.

        Parameters
        ----------
        dates: pd.Series containing timestamps.
        period: Number of days of the period.
        series_order: Number of components.

        Returns
        -------
        Matrix with seasonality features.
        """
        # convert to days since epoch
        t = np.array(
            (dates - pd.datetime(1970, 1, 1))
            .dt.total_seconds()
            .astype(np.float)
        ) / (3600 * 24.)
        return np.column_stack([
            fun((2.0 * (i + 1) * np.pi * t / period))
            for i in range(series_order)
            for fun in (np.sin, np.cos)
        ])
test_data.py 文件源码 项目:linearmodels 作者: bashtage 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_incorrect_time_axis():
    x = np.random.randn(3, 3, 1000)
    entities = ['entity.{0}'.format(i) for i in range(1000)]
    time = ['time.{0}'.format(i) for i in range(3)]
    var_names = ['var.{0}'.format(i) for i in range(3)]
    p = pd.Panel(x, items=var_names, major_axis=time, minor_axis=entities)
    with pytest.raises(ValueError):
        PanelData(p)
    df = p.swapaxes(1, 2).swapaxes(0, 1).to_frame()
    with pytest.raises(ValueError):
        PanelData(df)

    time = [1, pd.datetime(1960, 1, 1), 'a']
    var_names = ['var.{0}'.format(i) for i in range(3)]
    p = pd.Panel(x, items=var_names, major_axis=time, minor_axis=entities)
    with pytest.raises(ValueError):
        PanelData(p)
    df = p.swapaxes(1, 2).swapaxes(0, 1).to_frame()
    with pytest.raises(ValueError):
        PanelData(df)
chem.py 文件源码 项目:faampy 作者: ncasuk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def read_fgga_txt(ifile)    :
    fgga_dateparse = lambda x: pd.datetime.utcfromtimestamp(int(x))
    fgga_names = ['identifier', 'packet_length', 'timestamp', 'ptp_sync',
                  'MFM', 'flight_num', 'CPU_Load', 'USB_disk_space', 'ch4',
                  'co2', 'h2o', 'press_torr', 'temp_c', 'fit_flag',
                  'rda_usec', 'rdb_usec', 'ch4_ppb', 'co2_ppm',
                  'MFC_1_absolute_pressure', 'MFC_1_temperature',
                  'MFC_1volumetic_flow', 'MFC_1mass_flow', 'MFC_1set_point',
                  'V1', 'V2', 'V3', 'V4', 'restart_FGGA', 'FGGA_Pump',
                  'CAL_MFC_1Set_Value']
    df_fgga = pd.read_csv(ifile,
                          names=fgga_names,
                          delimiter=',',
                          parse_dates=[2],
                          date_parser=fgga_dateparse,
                          skiprows=100)     # To be sure to skip the header

    # Using the Valve states for flagging out calibration periods
    # TODO: add time buffer around calibration periods
    df_fgga.loc[df_fgga['V1'] != 0, 'ch4_ppb'] = np.nan
    df_fgga.loc[df_fgga['V2'] != 0, 'co2_ppm'] = np.nan
    df_fgga.loc[df_fgga['V2'] != 0, 'ch4_ppb'] = np.nan
    return df_fgga
test_historical_medians.py 文件源码 项目:guacml 作者: guacml 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_medians_for_gaps(self):
        df = pd.DataFrame({
            'date': list(pd.date_range(pd.datetime(2015, 6, 15),
                                       pd.datetime(2015, 6, 20))),
            'value': range(6)
        })
        df = df.iloc[[0, 2, 3, 4, 5]]
        guac = GuacMl(df, 'value')
        guac.make_time_series('date', prediction_length=1)
        medians = HistoricalMedians([3], guac.config, guac.logger)
        out = medians.execute(guac.data)
        self.assertTrue(np.isnan(out.df['value_median_3'].iloc[0]))
        self.assertEqual(out.df['value_median_3'].iloc[1], 0)
        self.assertEqual(out.df['value_median_3'].iloc[2], 1)
        self.assertEqual(out.df['value_median_3'].iloc[3], 2.5)
        self.assertEqual(out.df['value_median_3'].iloc[4], 3)
test_historical_medians.py 文件源码 项目:guacml 作者: guacml 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_medians_series_and_group_keys_simple(self):
        df = pd.DataFrame({
            'date':
                list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 20))) +
                list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 20))),
            'series_key': ['a'] * 6 + ['b'] * 6,
            'group_key': ['uneven', 'even'] * 6,
            'value': range(12)
        })
        guac = GuacMl(df, 'value')
        guac.make_time_series('date', prediction_length=1, series_key_cols='series_key')
        medians = HistoricalMedians([2], guac.config, guac.logger, group_keys='group_key')
        out = medians.execute(guac.data)

        out.df = out.df.sort_values(['series_key', 'group_key', 'date'])
        self.assertTrue(np.isnan(out.df['value_median_2_by_group_key'].iloc[0]))
        self.assertEqual(out.df['value_median_2_by_group_key'].iloc[1], 1)
        self.assertEqual(out.df['value_median_2_by_group_key'].iloc[2], 2)
        self.assertTrue(np.isnan(out.df['value_median_2_by_group_key'].iloc[3]))
        self.assertEqual(out.df['value_median_2_by_group_key'].iloc[4], 0)
        self.assertEqual(out.df['value_median_2_by_group_key'].iloc[5], 1)
        self.assertTrue(np.isnan(out.df['value_median_2_by_group_key'].iloc[6]))
factory.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_simulation_parameters(year=2006, start=None, end=None,
                                 capital_base=float("1.0e5"),
                                 num_days=None,
                                 data_frequency='daily',
                                 emission_rate='daily',
                                 env=None):
    if env is None:
        # Construct a complete environment with reasonable defaults
        env = TradingEnvironment(load=noop_load)
    if start is None:
        start = datetime(year, 1, 1, tzinfo=pytz.utc)
    if end is None:
        if num_days:
            start_index = env.trading_days.searchsorted(start)
            end = env.trading_days[start_index + num_days - 1]
        else:
            end = datetime(year, 12, 31, tzinfo=pytz.utc)
    sim_params = SimulationParameters(
        period_start=start,
        period_end=end,
        capital_base=capital_base,
        data_frequency=data_frequency,
        emission_rate=emission_rate,
        env=env,
    )

    return sim_params
factory.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def create_txn(sid, price, amount, datetime):
    txn = Event({
        'sid': sid,
        'amount': amount,
        'dt': datetime,
        'price': price,
        'type': DATASOURCE_TYPE.TRANSACTION,
        'source_id': 'MockTransactionSource'
    })
    return txn
factory.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def create_commission(sid, value, datetime):
    txn = Event({
        'dt': datetime,
        'type': DATASOURCE_TYPE.COMMISSION,
        'cost': value,
        'sid': sid,
        'source_id': 'MockCommissionSource'
    })
    return txn
factory.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def create_test_df_source(sim_params=None, env=None, bars='daily'):
    if bars == 'daily':
        freq = pd.datetools.BDay()
    elif bars == 'minute':
        freq = pd.datetools.Minute()
    else:
        raise ValueError('%s bars not understood.' % bars)

    if sim_params and bars == 'daily':
        index = sim_params.trading_days
    else:
        if env is None:
            env = TradingEnvironment(load=noop_load)

        start = pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)
        end = pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)

        days = env.days_in_range(start, end)

        if bars == 'daily':
            index = days
        if bars == 'minute':
            index = pd.DatetimeIndex([], freq=freq)

            for day in days:
                day_index = env.market_minutes_for_day(day)
                index = index.append(day_index)

    x = np.arange(1, len(index) + 1)

    df = pd.DataFrame(x, index=index, columns=[0])

    return DataFrameSource(df), df
factory.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def create_test_panel_source(sim_params=None, env=None, source_type=None):
    start = sim_params.first_open \
        if sim_params else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)

    end = sim_params.last_close \
        if sim_params else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)

    if env is None:
        env = TradingEnvironment(load=noop_load)

    index = env.days_in_range(start, end)

    price = np.arange(0, len(index))
    volume = np.ones(len(index)) * 1000

    arbitrary = np.ones(len(index))

    df = pd.DataFrame({'price': price,
                       'volume': volume,
                       'arbitrary': arbitrary},
                      index=index)
    if source_type:
        df['type'] = source_type

    panel = pd.Panel.from_dict({0: df})

    return DataPanelSource(panel), panel
loader.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def load_from_yahoo(indexes=None,
                    stocks=None,
                    start=None,
                    end=None,
                    adjusted=True):
    """
    Loads price data from Yahoo into a dataframe for each of the indicated
    assets.  By default, 'price' is taken from Yahoo's 'Adjusted Close',
    which removes the impact of splits and dividends. If the argument
    'adjusted' is False, then the non-adjusted 'close' field is used instead.

    :param indexes: Financial indexes to load.
    :type indexes: dict
    :param stocks: Stock closing prices to load.
    :type stocks: list
    :param start: Retrieve prices from start date on.
    :type start: datetime
    :param end: Retrieve prices until end date.
    :type end: datetime
    :param adjusted: Adjust the price for splits and dividends.
    :type adjusted: bool

    """
    data = _load_raw_yahoo_data(indexes, stocks, start, end)
    if adjusted:
        close_key = 'Adj Close'
    else:
        close_key = 'Close'
    df = pd.DataFrame({key: d[close_key] for key, d in iteritems(data)})
    df.index = df.index.tz_localize(pytz.utc)
    return df
test_batchtransform.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        self.sids = range(90)
        self.env = TradingEnvironment()
        self.env.write_data(equities_identifiers=self.sids)

        self.sim_params = factory.create_simulation_parameters(
            start=datetime(1990, 1, 1, tzinfo=pytz.utc),
            end=datetime(1990, 1, 8, tzinfo=pytz.utc),
            env=self.env,
        )
test_batchtransform.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        setup_logger(self)
        start = pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)
        end = pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)
        self.sim_params = factory.create_simulation_parameters(
            start=start, end=end, env=self.env,
        )
        self.sim_params.emission_rate = 'daily'
        self.sim_params.data_frequency = 'minute'
        self.source, self.df = \
            factory.create_test_df_source(sim_params=self.sim_params,
                                          env=self.env,
                                          bars='minute')
test_batchtransform.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def setUp(self):
        setup_logger(self)
        self.sim_params = factory.create_simulation_parameters(
            start=datetime(1990, 1, 1, tzinfo=pytz.utc),
            end=datetime(1990, 1, 8, tzinfo=pytz.utc),
            env=self.env
        )
        self.source, self.df = \
            factory.create_test_df_source(self.sim_params, self.env)
general.py 文件源码 项目:veneer-py 作者: flowmatters 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def parse_veneer_date(self,txt):
        if hasattr(txt,'strftime'):
            return txt
        return pd.datetime.strptime(txt,'%m/%d/%Y %H:%M:%S')
general.py 文件源码 项目:veneer-py 作者: flowmatters 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def read_sdt(fn):
    ts = pd.read_table(fn,sep=' +',engine='python',names=['Year','Month','Day','Val'])
    ts['Date'] = ts.apply(lambda row: pd.datetime(int(row.Year),int(row.Month),int(row.Day)),axis=1)
    ts = ts.set_index(ts.Date)
    return ts.Val
loader.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def load_from_yahoo(indexes=None,
                    stocks=None,
                    start=None,
                    end=None,
                    adjusted=True):
    """
    Loads price data from Yahoo into a dataframe for each of the indicated
    assets.  By default, 'price' is taken from Yahoo's 'Adjusted Close',
    which removes the impact of splits and dividends. If the argument
    'adjusted' is False, then the non-adjusted 'close' field is used instead.

    :param indexes: Financial indexes to load.
    :type indexes: dict
    :param stocks: Stock closing prices to load.
    :type stocks: list
    :param start: Retrieve prices from start date on.
    :type start: datetime
    :param end: Retrieve prices until end date.
    :type end: datetime
    :param adjusted: Adjust the price for splits and dividends.
    :type adjusted: bool

    """
    data = _load_raw_yahoo_data(indexes, stocks, start, end)
    if adjusted:
        close_key = 'Adj Close'
    else:
        close_key = 'Close'
    df = pd.DataFrame({key: d[close_key] for key, d in iteritems(data)})
    df.index = df.index.tz_localize(pytz.utc)
    return df
agent.py 文件源码 项目:pyktrader2 作者: harveywwu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add_instrument(self, name):
        self.tick_data[name] = []        
        dtypes = [(field, dtype_map[field]) for field in day_data_list]
        self.day_data[name]  = data_handler.DynamicRecArray(dtype = dtypes)
        dtypes = [(field, dtype_map[field]) for field in min_data_list]
        self.min_data[name]  = {1: data_handler.DynamicRecArray(dtype = dtypes)}
        self.cur_day[name]   = dict([(item, 0) for item in day_data_list])
        self.cur_min[name]   = dict([(item, 0) for item in min_data_list])
        self.day_data_func[name] = []
        self.min_data_func[name] = {}
        self.cur_min[name]['datetime'] = datetime.datetime.fromordinal(self.scur_day.toordinal())
        self.cur_min[name]['date'] = self.scur_day
        self.cur_day[name]['date'] = self.scur_day
agent.py 文件源码 项目:pyktrader2 作者: harveywwu 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def mkt_data_sod(self, tday):
        for inst in self.instruments:
            self.tick_data[inst] = []
            self.cur_min[inst] = dict([(item, 0) for item in min_data_list])
            self.cur_day[inst] = dict([(item, 0) for item in day_data_list])
            self.cur_day[inst]['date'] = tday
            self.cur_min[inst]['datetime'] = datetime.datetime.fromordinal(tday.toordinal())
agent.py 文件源码 项目:pyktrader2 作者: harveywwu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def day_switch(self, event):
        newday = event.dict['date']
        if newday <= self.scur_day:
            return
        self.logger.info('switching the trading day from %s to %s, reset tick_id=%s to 0' % (self.scur_day, newday, self.tick_id))
        if not self.eod_flag:
            self.run_eod()
        self.scur_day = newday
        self.tick_id = 0
        self.timer_count = 0
        super(Agent, self).mkt_data_sod(newday)
        self.eod_flag = False
        eod_time = datetime.datetime.combine(newday, datetime.time(15, 20, 0))
        self.put_command(eod_time, self.run_eod)
agent.py 文件源码 项目:pyktrader2 作者: harveywwu 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def update_instrument(self, tick):      
        inst = tick.instID    
        curr_tick = tick.tick_id
        if (self.instruments[inst].exchange == 'CZCE') and (self.instruments[inst].last_update == tick.tick_id) and \
                ((self.instruments[inst].volume < tick.volume) or (self.instruments[inst].ask_vol1 != tick.askVol1) or \
                    (self.instruments[inst].bid_vol1 != tick.bidVol1)):
                if tick.tick_id % 10 < 5:
                    tick.tick_id += 5
                    tick.timestamp = tick.timestamp + datetime.timedelta(milliseconds=500)
        self.tick_id = max(curr_tick, self.tick_id)
        self.instruments[inst].up_limit   = tick.upLimit
        self.instruments[inst].down_limit = tick.downLimit        
        tick.askPrice1 = min(tick.askPrice1, tick.upLimit)
        tick.bidPrice1 = max(tick.bidPrice1, tick.downLimit)
        self.instruments[inst].last_update = curr_tick
        self.instruments[inst].bid_price1 = tick.bidPrice1
        self.instruments[inst].ask_price1 = tick.askPrice1
        self.instruments[inst].mid_price = (tick.askPrice1 + tick.bidPrice1)/2.0
        if (self.instruments[inst].mid_price > tick.upLimit) or (self.instruments[inst].mid_price < tick.downLimit):
            return False
        self.instruments[inst].bid_vol1   = tick.bidVol1
        self.instruments[inst].ask_vol1   = tick.askVol1
        self.instruments[inst].open_interest = tick.openInterest
        last_volume = self.instruments[inst].volume       
        if tick.volume > last_volume:
            self.instruments[inst].price  = tick.price
            self.instruments[inst].volume = tick.volume
            self.instruments[inst].last_traded = curr_tick
        if inst in self.inst2spread:
            for spd_key in self.inst2spread[inst]:
                self.spread_data[spd_key].update()
        return True
agent.py 文件源码 项目:pyktrader2 作者: harveywwu 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run_gway_service(self, gway, service, args):
        if gway in self.gateways:
            gateway = self.gateways[gway]
            svc_func = service
            if hasattr(gateway, svc_func):
                ts = datetime.datetime.now()
                self.put_command(ts, getattr(gateway, svc_func), args)
            else:
                print "no such service = % for %s" % (service, gway)
        else:
            print "no such a gateway %s" % gway
tradeagent.py 文件源码 项目:pyktrader2 作者: harveywwu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add_instrument(self, name):
        self.tick_data[name] = []        
        dtypes = [(field, dtype_map[field]) for field in day_data_list]
        self.day_data[name]  = data_handler.DynamicRecArray(dtype = dtypes)
        dtypes = [(field, dtype_map[field]) for field in min_data_list]
        self.min_data[name]  = {1: data_handler.DynamicRecArray(dtype = dtypes)}
        self.cur_day[name]   = dict([(item, 0) for item in day_data_list])
        self.cur_min[name]   = dict([(item, 0) for item in min_data_list])
        self.day_data_func[name] = []
        self.min_data_func[name] = {}
        self.cur_min[name]['datetime'] = datetime.datetime.fromordinal(self.scur_day.toordinal())
        self.cur_min[name]['date'] = self.scur_day
        self.cur_day[name]['date'] = self.scur_day
tradeagent.py 文件源码 项目:pyktrader2 作者: harveywwu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def mkt_data_sod(self, tday):
        for inst in self.instruments:
            self.tick_data[inst] = []
            self.cur_min[inst] = dict([(item, 0) for item in min_data_list])
            self.cur_day[inst] = dict([(item, 0) for item in day_data_list])
            self.cur_day[inst]['date'] = tday
            self.cur_min[inst]['datetime'] = datetime.datetime.fromordinal(tday.toordinal())


问题


面经


文章

微信
公众号

扫码关注公众号