def test_insert_hist_data(self):
self._clear_db()
init_db(self.db_info)
# Insert two time-overlapped MarketDataBlocks
async def run(loop, data):
engine = await aiosa.create_engine(
user=self.db_info['user'], db=self.db_info['db'],
host=self.db_info['host'], password=self.db_info['password'],
loop=loop, echo=False)
await insert_hist_data(engine, 'Stock', data[0])
await insert_hist_data(engine, 'Stock', data[1])
engine.close()
await engine.wait_closed()
# Execute insertion
blk0 = MarketDataBlock(testdata_insert_hist_data[0])
blk1 = MarketDataBlock(testdata_insert_hist_data[1])
data = [blk0, blk1]
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop, data))
# Verify insertion
df_source = testdata_insert_hist_data[2]
engine = create_engine(self.db_conn)
conn = engine.connect()
metadata = MetaData(engine, reflect=True)
table = metadata.tables['Stock']
result = conn.execute(select([table]))
# self.assertEqual(result.keys(), list(df_source.columns))
df = pd.DataFrame(result.fetchall())
df.columns = result.keys()
_logger.debug(df.TickerTime[0])
df.TickerTime = pd.DatetimeIndex(df.TickerTime).tz_localize('UTC')
df_source.TickerTime = df_source.TickerTime.apply(pd.Timestamp)
_logger.debug(df.iloc[0])
assert_frame_equal(df, df_source)
评论列表
文章目录