def ReqOrderInsert(self, **kwargs):
"""
InstrumentID ??
VolumeTotalOriginal ??
LimitPrice ??
StopPrice ???
Direction ??
CombOffsetFlag ?,?,??
ContingentCondition ????
TimeCondition ????
"""
sub_client = None
channel_name1, channel_name2 = None, None
try:
sub_client = await aioredis.create_redis(
(config.get('REDIS', 'host', fallback='localhost'),
config.getint('REDIS', 'port', fallback=6379)),
db=config.getint('REDIS', 'db', fallback=1))
request_id = self.next_id()
order_ref = self.next_order_ref()
kwargs['nRequestId'] = request_id
kwargs['OrderRef'] = order_ref
channel_name1 = self.__trade_response_format.format('OnRtnOrder', order_ref)
channel_name2 = self.__trade_response_format.format('OnRspError', request_id)
channel_name3 = self.__trade_response_format.format('OnRspOrderInsert', 0)
ch1, ch2, ch3 = await sub_client.psubscribe(channel_name1, channel_name2, channel_name3)
cb = self.io_loop.create_future()
tasks = [
asyncio.ensure_future(self.query_reader(ch1, cb), loop=self.io_loop),
asyncio.ensure_future(self.query_reader(ch2, cb), loop=self.io_loop),
asyncio.ensure_future(self.query_reader(ch3, cb), loop=self.io_loop),
]
self.redis_client.publish(self.__request_format.format('ReqOrderInsert'), json.dumps(kwargs))
rst = await asyncio.wait_for(cb, HANDLER_TIME_OUT, loop=self.io_loop)
await sub_client.punsubscribe(channel_name1, channel_name2, channel_name3)
sub_client.close()
await asyncio.wait(tasks, loop=self.io_loop)
logger.info('ReqOrderInsert, rst: %s', rst)
return rst
except Exception as e:
logger.error('ReqOrderInsert failed: %s', repr(e), exc_info=True)
if sub_client and sub_client.in_pubsub and channel_name1:
await sub_client.unsubscribe(channel_name1, channel_name2)
sub_client.close()
return None
评论列表
文章目录