def test_yahoo_bars_to_panel_source(self):
env = TradingEnvironment()
finder = AssetFinder(env.engine)
stocks = ['AAPL', 'GE']
env.write_data(equities_identifiers=stocks)
start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)
end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)
data = factory.load_bars_from_yahoo(stocks=stocks,
indexes={},
start=start,
end=end)
check_fields = ['sid', 'open', 'high', 'low', 'close',
'volume', 'price']
copy_panel = data.copy()
sids = finder.map_identifier_index_to_sids(
data.items, data.major_axis[0]
)
copy_panel.items = sids
source = DataPanelSource(copy_panel)
for event in source:
for check_field in check_fields:
self.assertIn(check_field, event)
self.assertTrue(isinstance(event['volume'], (integer_types)))
self.assertTrue(event['sid'] in sids)
python类datetime()的实例源码
def test_load_bars_from_yahoo(self):
stocks = ['AAPL', 'GE']
start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)
end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)
data = load_bars_from_yahoo(stocks=stocks, start=start, end=end)
assert data.major_axis[0] == pd.Timestamp('1993-01-04 00:00:00+0000')
assert data.major_axis[-1] == pd.Timestamp('2001-12-31 00:00:00+0000')
for stock in stocks:
assert stock in data.items
for ohlc in ['open', 'high', 'low', 'close', 'volume', 'price']:
assert ohlc in data.minor_axis
np.testing.assert_raises(
AssertionError, load_bars_from_yahoo, stocks=stocks,
start=end, end=start
)
def __init__(self, year, seasons=None, holidays=None):
if calendar.isleap(year):
hoy = 8784
else:
hoy = 8760
self.datapath = os.path.join(os.path.dirname(__file__), 'bdew_data')
self.date_time_index = pd.date_range(
pd.datetime(year, 1, 1, 0), periods=hoy * 4, freq='15Min')
if seasons is None:
self.seasons = {
'summer1': [5, 15, 9, 14], # summer: 15.05. to 14.09
'transition1': [3, 21, 5, 14], # transition1 :21.03. to 14.05
'transition2': [9, 15, 10, 31], # transition2 :15.09. to 31.10
'winter1': [1, 1, 3, 20], # winter1: 01.01. to 20.03
'winter2': [11, 1, 12, 31], # winter2: 01.11. to 31.12
}
else:
self.seasons = seasons
self.year = year
self.slp_frame = self.all_load_profiles(self.date_time_index,
holidays=holidays)
def run_tick(self, event):#???????
tick = event.dict['data']
if self.live_trading:
now_ticknum = get_tick_num(datetime.datetime.now())
cur_ticknum = get_tick_num(tick.timestamp)
if abs(cur_ticknum - now_ticknum)> self.realtime_tick_diff:
self.logger.warning('the tick timestamp has more than 10sec diff from the system time, inst=%s, ticknum= %s, now_ticknum=%s' % (tick.instID, cur_ticknum, now_ticknum))
if not self.update_instrument(tick):
return
inst = tick.instID
if inst in self.inst2spread:
for key in self.inst2spread[inst]:
self.trade_manager.check_pending_trades(key)
self.trade_manager.check_pending_trades(inst)
self.update_min_bar(tick)
if inst in self.inst2spread:
for key in self.inst2spread[inst]:
self.trade_manager.process_trades(key)
self.trade_manager.process_trades(inst)
gway = self.inst2gateway[inst]
if gway.process_flag:
gway.send_queued_orders()
test_feature_availability_profiler.py 文件源码
项目:healthcareai-py
作者: HealthCatalyst
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def setUp(self):
self.df = pd.DataFrame(np.random.randn(1000, 4),
columns=['A', 'B', 'AdmitDTS', 'LastLoadDTS'])
# generate load date
self.df['LastLoadDTS'] = pd.datetime(2015, 5, 20)
# generate datetime objects for admit date
admit = pd.Series(1000)
delta = pd.datetime(2015, 5, 20) - pd.datetime(2015, 5, 1)
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
for i in range(1000):
random_second = randrange(int_delta)
admit[i] = pd.datetime(2015, 5, 1) + timedelta(seconds=random_second)
self.df['AdmitDTS'] = admit
# add nulls
a = np.random.rand(1000) > .5
self.df.loc[a, ['A']] = np.nan
a = np.random.rand(1000) > .75
self.df.loc[a, ['B']] = np.nan
def test_write_metrics1():
filename = abspath(join(testdir, 'test_write_metrics1.csv'))
if isfile(filename):
os.remove(filename)
metrics = pd.DataFrame({'metric1' : pd.Series([1.], index=[pd.datetime(2016,1,1)])})
pecos.io.write_metrics(filename, metrics)
assert_true(isfile(filename))
from_file1 = pd.read_csv(filename)
assert_equals(from_file1.shape, (1,2))
# append another date
metrics = pd.DataFrame({'metric1' : pd.Series([2.], index=[pd.datetime(2016,1,2)])})
pecos.io.write_metrics(filename, metrics)
from_file2 = pd.read_csv(filename)
assert_equals(from_file2.shape, (2,2))
# append another metric
metrics = pd.DataFrame({'metric2' : pd.Series([3.], index=[pd.datetime(2016,1,2)])})
pecos.io.write_metrics(filename, metrics)
from_file3= pd.read_csv(filename)
assert_equals(from_file3.shape, (2,3))
def fourier_series(dates, period, series_order):
"""Provides Fourier series components with the specified frequency
and order.
Parameters
----------
dates: pd.Series containing timestamps.
period: Number of days of the period.
series_order: Number of components.
Returns
-------
Matrix with seasonality features.
"""
# convert to days since epoch
t = np.array(
(dates - pd.datetime(1970, 1, 1))
.dt.total_seconds()
.astype(np.float)
) / (3600 * 24.)
return np.column_stack([
fun((2.0 * (i + 1) * np.pi * t / period))
for i in range(series_order)
for fun in (np.sin, np.cos)
])
def test_incorrect_time_axis():
x = np.random.randn(3, 3, 1000)
entities = ['entity.{0}'.format(i) for i in range(1000)]
time = ['time.{0}'.format(i) for i in range(3)]
var_names = ['var.{0}'.format(i) for i in range(3)]
p = pd.Panel(x, items=var_names, major_axis=time, minor_axis=entities)
with pytest.raises(ValueError):
PanelData(p)
df = p.swapaxes(1, 2).swapaxes(0, 1).to_frame()
with pytest.raises(ValueError):
PanelData(df)
time = [1, pd.datetime(1960, 1, 1), 'a']
var_names = ['var.{0}'.format(i) for i in range(3)]
p = pd.Panel(x, items=var_names, major_axis=time, minor_axis=entities)
with pytest.raises(ValueError):
PanelData(p)
df = p.swapaxes(1, 2).swapaxes(0, 1).to_frame()
with pytest.raises(ValueError):
PanelData(df)
def read_fgga_txt(ifile) :
fgga_dateparse = lambda x: pd.datetime.utcfromtimestamp(int(x))
fgga_names = ['identifier', 'packet_length', 'timestamp', 'ptp_sync',
'MFM', 'flight_num', 'CPU_Load', 'USB_disk_space', 'ch4',
'co2', 'h2o', 'press_torr', 'temp_c', 'fit_flag',
'rda_usec', 'rdb_usec', 'ch4_ppb', 'co2_ppm',
'MFC_1_absolute_pressure', 'MFC_1_temperature',
'MFC_1volumetic_flow', 'MFC_1mass_flow', 'MFC_1set_point',
'V1', 'V2', 'V3', 'V4', 'restart_FGGA', 'FGGA_Pump',
'CAL_MFC_1Set_Value']
df_fgga = pd.read_csv(ifile,
names=fgga_names,
delimiter=',',
parse_dates=[2],
date_parser=fgga_dateparse,
skiprows=100) # To be sure to skip the header
# Using the Valve states for flagging out calibration periods
# TODO: add time buffer around calibration periods
df_fgga.loc[df_fgga['V1'] != 0, 'ch4_ppb'] = np.nan
df_fgga.loc[df_fgga['V2'] != 0, 'co2_ppm'] = np.nan
df_fgga.loc[df_fgga['V2'] != 0, 'ch4_ppb'] = np.nan
return df_fgga
def test_medians_for_gaps(self):
df = pd.DataFrame({
'date': list(pd.date_range(pd.datetime(2015, 6, 15),
pd.datetime(2015, 6, 20))),
'value': range(6)
})
df = df.iloc[[0, 2, 3, 4, 5]]
guac = GuacMl(df, 'value')
guac.make_time_series('date', prediction_length=1)
medians = HistoricalMedians([3], guac.config, guac.logger)
out = medians.execute(guac.data)
self.assertTrue(np.isnan(out.df['value_median_3'].iloc[0]))
self.assertEqual(out.df['value_median_3'].iloc[1], 0)
self.assertEqual(out.df['value_median_3'].iloc[2], 1)
self.assertEqual(out.df['value_median_3'].iloc[3], 2.5)
self.assertEqual(out.df['value_median_3'].iloc[4], 3)
def test_medians_series_and_group_keys_simple(self):
df = pd.DataFrame({
'date':
list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 20))) +
list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 20))),
'series_key': ['a'] * 6 + ['b'] * 6,
'group_key': ['uneven', 'even'] * 6,
'value': range(12)
})
guac = GuacMl(df, 'value')
guac.make_time_series('date', prediction_length=1, series_key_cols='series_key')
medians = HistoricalMedians([2], guac.config, guac.logger, group_keys='group_key')
out = medians.execute(guac.data)
out.df = out.df.sort_values(['series_key', 'group_key', 'date'])
self.assertTrue(np.isnan(out.df['value_median_2_by_group_key'].iloc[0]))
self.assertEqual(out.df['value_median_2_by_group_key'].iloc[1], 1)
self.assertEqual(out.df['value_median_2_by_group_key'].iloc[2], 2)
self.assertTrue(np.isnan(out.df['value_median_2_by_group_key'].iloc[3]))
self.assertEqual(out.df['value_median_2_by_group_key'].iloc[4], 0)
self.assertEqual(out.df['value_median_2_by_group_key'].iloc[5], 1)
self.assertTrue(np.isnan(out.df['value_median_2_by_group_key'].iloc[6]))
def create_simulation_parameters(year=2006, start=None, end=None,
capital_base=float("1.0e5"),
num_days=None,
data_frequency='daily',
emission_rate='daily',
env=None):
if env is None:
# Construct a complete environment with reasonable defaults
env = TradingEnvironment(load=noop_load)
if start is None:
start = datetime(year, 1, 1, tzinfo=pytz.utc)
if end is None:
if num_days:
start_index = env.trading_days.searchsorted(start)
end = env.trading_days[start_index + num_days - 1]
else:
end = datetime(year, 12, 31, tzinfo=pytz.utc)
sim_params = SimulationParameters(
period_start=start,
period_end=end,
capital_base=capital_base,
data_frequency=data_frequency,
emission_rate=emission_rate,
env=env,
)
return sim_params
def create_txn(sid, price, amount, datetime):
txn = Event({
'sid': sid,
'amount': amount,
'dt': datetime,
'price': price,
'type': DATASOURCE_TYPE.TRANSACTION,
'source_id': 'MockTransactionSource'
})
return txn
def create_commission(sid, value, datetime):
txn = Event({
'dt': datetime,
'type': DATASOURCE_TYPE.COMMISSION,
'cost': value,
'sid': sid,
'source_id': 'MockCommissionSource'
})
return txn
def create_test_df_source(sim_params=None, env=None, bars='daily'):
if bars == 'daily':
freq = pd.datetools.BDay()
elif bars == 'minute':
freq = pd.datetools.Minute()
else:
raise ValueError('%s bars not understood.' % bars)
if sim_params and bars == 'daily':
index = sim_params.trading_days
else:
if env is None:
env = TradingEnvironment(load=noop_load)
start = pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)
end = pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)
days = env.days_in_range(start, end)
if bars == 'daily':
index = days
if bars == 'minute':
index = pd.DatetimeIndex([], freq=freq)
for day in days:
day_index = env.market_minutes_for_day(day)
index = index.append(day_index)
x = np.arange(1, len(index) + 1)
df = pd.DataFrame(x, index=index, columns=[0])
return DataFrameSource(df), df
def create_test_panel_source(sim_params=None, env=None, source_type=None):
start = sim_params.first_open \
if sim_params else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)
end = sim_params.last_close \
if sim_params else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)
if env is None:
env = TradingEnvironment(load=noop_load)
index = env.days_in_range(start, end)
price = np.arange(0, len(index))
volume = np.ones(len(index)) * 1000
arbitrary = np.ones(len(index))
df = pd.DataFrame({'price': price,
'volume': volume,
'arbitrary': arbitrary},
index=index)
if source_type:
df['type'] = source_type
panel = pd.Panel.from_dict({0: df})
return DataPanelSource(panel), panel
def load_from_yahoo(indexes=None,
stocks=None,
start=None,
end=None,
adjusted=True):
"""
Loads price data from Yahoo into a dataframe for each of the indicated
assets. By default, 'price' is taken from Yahoo's 'Adjusted Close',
which removes the impact of splits and dividends. If the argument
'adjusted' is False, then the non-adjusted 'close' field is used instead.
:param indexes: Financial indexes to load.
:type indexes: dict
:param stocks: Stock closing prices to load.
:type stocks: list
:param start: Retrieve prices from start date on.
:type start: datetime
:param end: Retrieve prices until end date.
:type end: datetime
:param adjusted: Adjust the price for splits and dividends.
:type adjusted: bool
"""
data = _load_raw_yahoo_data(indexes, stocks, start, end)
if adjusted:
close_key = 'Adj Close'
else:
close_key = 'Close'
df = pd.DataFrame({key: d[close_key] for key, d in iteritems(data)})
df.index = df.index.tz_localize(pytz.utc)
return df
def setUp(self):
self.sids = range(90)
self.env = TradingEnvironment()
self.env.write_data(equities_identifiers=self.sids)
self.sim_params = factory.create_simulation_parameters(
start=datetime(1990, 1, 1, tzinfo=pytz.utc),
end=datetime(1990, 1, 8, tzinfo=pytz.utc),
env=self.env,
)
def setUp(self):
setup_logger(self)
start = pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)
end = pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)
self.sim_params = factory.create_simulation_parameters(
start=start, end=end, env=self.env,
)
self.sim_params.emission_rate = 'daily'
self.sim_params.data_frequency = 'minute'
self.source, self.df = \
factory.create_test_df_source(sim_params=self.sim_params,
env=self.env,
bars='minute')
def setUp(self):
setup_logger(self)
self.sim_params = factory.create_simulation_parameters(
start=datetime(1990, 1, 1, tzinfo=pytz.utc),
end=datetime(1990, 1, 8, tzinfo=pytz.utc),
env=self.env
)
self.source, self.df = \
factory.create_test_df_source(self.sim_params, self.env)
def parse_veneer_date(self,txt):
if hasattr(txt,'strftime'):
return txt
return pd.datetime.strptime(txt,'%m/%d/%Y %H:%M:%S')
def read_sdt(fn):
ts = pd.read_table(fn,sep=' +',engine='python',names=['Year','Month','Day','Val'])
ts['Date'] = ts.apply(lambda row: pd.datetime(int(row.Year),int(row.Month),int(row.Day)),axis=1)
ts = ts.set_index(ts.Date)
return ts.Val
def load_from_yahoo(indexes=None,
stocks=None,
start=None,
end=None,
adjusted=True):
"""
Loads price data from Yahoo into a dataframe for each of the indicated
assets. By default, 'price' is taken from Yahoo's 'Adjusted Close',
which removes the impact of splits and dividends. If the argument
'adjusted' is False, then the non-adjusted 'close' field is used instead.
:param indexes: Financial indexes to load.
:type indexes: dict
:param stocks: Stock closing prices to load.
:type stocks: list
:param start: Retrieve prices from start date on.
:type start: datetime
:param end: Retrieve prices until end date.
:type end: datetime
:param adjusted: Adjust the price for splits and dividends.
:type adjusted: bool
"""
data = _load_raw_yahoo_data(indexes, stocks, start, end)
if adjusted:
close_key = 'Adj Close'
else:
close_key = 'Close'
df = pd.DataFrame({key: d[close_key] for key, d in iteritems(data)})
df.index = df.index.tz_localize(pytz.utc)
return df
def add_instrument(self, name):
self.tick_data[name] = []
dtypes = [(field, dtype_map[field]) for field in day_data_list]
self.day_data[name] = data_handler.DynamicRecArray(dtype = dtypes)
dtypes = [(field, dtype_map[field]) for field in min_data_list]
self.min_data[name] = {1: data_handler.DynamicRecArray(dtype = dtypes)}
self.cur_day[name] = dict([(item, 0) for item in day_data_list])
self.cur_min[name] = dict([(item, 0) for item in min_data_list])
self.day_data_func[name] = []
self.min_data_func[name] = {}
self.cur_min[name]['datetime'] = datetime.datetime.fromordinal(self.scur_day.toordinal())
self.cur_min[name]['date'] = self.scur_day
self.cur_day[name]['date'] = self.scur_day
def mkt_data_sod(self, tday):
for inst in self.instruments:
self.tick_data[inst] = []
self.cur_min[inst] = dict([(item, 0) for item in min_data_list])
self.cur_day[inst] = dict([(item, 0) for item in day_data_list])
self.cur_day[inst]['date'] = tday
self.cur_min[inst]['datetime'] = datetime.datetime.fromordinal(tday.toordinal())
def day_switch(self, event):
newday = event.dict['date']
if newday <= self.scur_day:
return
self.logger.info('switching the trading day from %s to %s, reset tick_id=%s to 0' % (self.scur_day, newday, self.tick_id))
if not self.eod_flag:
self.run_eod()
self.scur_day = newday
self.tick_id = 0
self.timer_count = 0
super(Agent, self).mkt_data_sod(newday)
self.eod_flag = False
eod_time = datetime.datetime.combine(newday, datetime.time(15, 20, 0))
self.put_command(eod_time, self.run_eod)
def update_instrument(self, tick):
inst = tick.instID
curr_tick = tick.tick_id
if (self.instruments[inst].exchange == 'CZCE') and (self.instruments[inst].last_update == tick.tick_id) and \
((self.instruments[inst].volume < tick.volume) or (self.instruments[inst].ask_vol1 != tick.askVol1) or \
(self.instruments[inst].bid_vol1 != tick.bidVol1)):
if tick.tick_id % 10 < 5:
tick.tick_id += 5
tick.timestamp = tick.timestamp + datetime.timedelta(milliseconds=500)
self.tick_id = max(curr_tick, self.tick_id)
self.instruments[inst].up_limit = tick.upLimit
self.instruments[inst].down_limit = tick.downLimit
tick.askPrice1 = min(tick.askPrice1, tick.upLimit)
tick.bidPrice1 = max(tick.bidPrice1, tick.downLimit)
self.instruments[inst].last_update = curr_tick
self.instruments[inst].bid_price1 = tick.bidPrice1
self.instruments[inst].ask_price1 = tick.askPrice1
self.instruments[inst].mid_price = (tick.askPrice1 + tick.bidPrice1)/2.0
if (self.instruments[inst].mid_price > tick.upLimit) or (self.instruments[inst].mid_price < tick.downLimit):
return False
self.instruments[inst].bid_vol1 = tick.bidVol1
self.instruments[inst].ask_vol1 = tick.askVol1
self.instruments[inst].open_interest = tick.openInterest
last_volume = self.instruments[inst].volume
if tick.volume > last_volume:
self.instruments[inst].price = tick.price
self.instruments[inst].volume = tick.volume
self.instruments[inst].last_traded = curr_tick
if inst in self.inst2spread:
for spd_key in self.inst2spread[inst]:
self.spread_data[spd_key].update()
return True
def run_gway_service(self, gway, service, args):
if gway in self.gateways:
gateway = self.gateways[gway]
svc_func = service
if hasattr(gateway, svc_func):
ts = datetime.datetime.now()
self.put_command(ts, getattr(gateway, svc_func), args)
else:
print "no such service = % for %s" % (service, gway)
else:
print "no such a gateway %s" % gway
def add_instrument(self, name):
self.tick_data[name] = []
dtypes = [(field, dtype_map[field]) for field in day_data_list]
self.day_data[name] = data_handler.DynamicRecArray(dtype = dtypes)
dtypes = [(field, dtype_map[field]) for field in min_data_list]
self.min_data[name] = {1: data_handler.DynamicRecArray(dtype = dtypes)}
self.cur_day[name] = dict([(item, 0) for item in day_data_list])
self.cur_min[name] = dict([(item, 0) for item in min_data_list])
self.day_data_func[name] = []
self.min_data_func[name] = {}
self.cur_min[name]['datetime'] = datetime.datetime.fromordinal(self.scur_day.toordinal())
self.cur_min[name]['date'] = self.scur_day
self.cur_day[name]['date'] = self.scur_day
def mkt_data_sod(self, tday):
for inst in self.instruments:
self.tick_data[inst] = []
self.cur_min[inst] = dict([(item, 0) for item in min_data_list])
self.cur_day[inst] = dict([(item, 0) for item in day_data_list])
self.cur_day[inst]['date'] = tday
self.cur_min[inst]['datetime'] = datetime.datetime.fromordinal(tday.toordinal())