def Start(self):
utils.prYellow("AIOREDIS")
self.redis = await aioredis.create_redis((utils.secret["Redis"],6379),encoding='utf8')
python类create_redis()的实例源码
def create_redis_client(_closable, loop, request):
"""Wrapper around aioredis.create_redis."""
@asyncio.coroutine
def f(*args, **kw):
kw.setdefault('loop', loop)
redis = yield from aioredis.create_redis(*args, **kw)
_closable(redis)
return redis
return f
def test_order_filter_invalid(self):
"""
Test invalid message order.
"""
shark = SocketShark(TEST_CONFIG)
await shark.prepare()
client = MockClient(shark)
session = client.session
subscription = 'simple.topic'
await session.on_client_event({
'event': 'subscribe',
'subscription': subscription,
})
assert client.log.pop() == {
'event': 'subscribe',
'subscription': subscription,
'status': 'ok',
}
# Test message from server to client
redis_settings = TEST_CONFIG['REDIS']
redis = await aioredis.create_redis((
redis_settings['host'], redis_settings['port']))
redis_topic = redis_settings['channel_prefix'] + subscription
await redis.publish_json(redis_topic, {
'subscription': subscription,
'_order': 'invalid',
'data': {'foo': 'invalid'},
})
await redis.publish_json(redis_topic, {
'subscription': subscription,
'data': {'foo': 'bar'},
})
redis.close()
# Wait for Redis to propagate the messages
await asyncio.sleep(0.1)
has_messages = await shark.run_service_receiver(once=True)
assert has_messages
assert client.log == [{
'event': 'message',
'subscription': subscription,
'data': {'foo': 'bar'},
}]
await shark.shutdown()
def ReqOrderInsert(self, **kwargs):
"""
InstrumentID ??
VolumeTotalOriginal ??
LimitPrice ??
StopPrice ???
Direction ??
CombOffsetFlag ?,?,??
ContingentCondition ????
TimeCondition ????
"""
sub_client = None
channel_name1, channel_name2 = None, None
try:
sub_client = await aioredis.create_redis(
(config.get('REDIS', 'host', fallback='localhost'),
config.getint('REDIS', 'port', fallback=6379)),
db=config.getint('REDIS', 'db', fallback=1))
request_id = self.next_id()
order_ref = self.next_order_ref()
kwargs['nRequestId'] = request_id
kwargs['OrderRef'] = order_ref
channel_name1 = self.__trade_response_format.format('OnRtnOrder', order_ref)
channel_name2 = self.__trade_response_format.format('OnRspError', request_id)
channel_name3 = self.__trade_response_format.format('OnRspOrderInsert', 0)
ch1, ch2, ch3 = await sub_client.psubscribe(channel_name1, channel_name2, channel_name3)
cb = self.io_loop.create_future()
tasks = [
asyncio.ensure_future(self.query_reader(ch1, cb), loop=self.io_loop),
asyncio.ensure_future(self.query_reader(ch2, cb), loop=self.io_loop),
asyncio.ensure_future(self.query_reader(ch3, cb), loop=self.io_loop),
]
self.redis_client.publish(self.__request_format.format('ReqOrderInsert'), json.dumps(kwargs))
rst = await asyncio.wait_for(cb, HANDLER_TIME_OUT, loop=self.io_loop)
await sub_client.punsubscribe(channel_name1, channel_name2, channel_name3)
sub_client.close()
await asyncio.wait(tasks, loop=self.io_loop)
logger.info('ReqOrderInsert, rst: %s', rst)
return rst
except Exception as e:
logger.error('ReqOrderInsert failed: %s', repr(e), exc_info=True)
if sub_client and sub_client.in_pubsub and channel_name1:
await sub_client.unsubscribe(channel_name1, channel_name2)
sub_client.close()
return None
def init(loop):
# Middlewares
middlewares = [
session_middleware(RedisStorage(await aioredis.create_pool((config.redis_ip, 6379)), cookie_name='w')),
error_middleware,
maintain_middleware,
]
# init server
app = web.Application(loop=loop,
middlewares=middlewares)
redis = await aioredis.create_redis((config.redis_ip, config.redis['port']), loop=loop)
app.redis = RedisFilter(redis)
# Register admin account
if await app.redis.get('User') is None:
await app.redis.set('SecretKey', os.urandom(16), many=False)
config.admin['password'] = await encrypt(app.redis, config.admin['password'])
config.admin['permission'] = 0x0f
await app.redis.set('User', config.admin, many=False)
# Init Profile
if await app.redis.get('Profile') is None:
await app.redis.set('Profile', {
'name': config.rss['author'],
'link_desc': '',
'text': ''
}, many=False)
# Security
setup_security(app,
SessionIdentityPolicy(),
RedisAuthorizationPolicy(redis))
await compass(app.router)
CONST.CATEGORY = await app.redis.get('Categories') or []
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(config.template_addr))
_handler = app.make_handler(
access_log=logger,
access_log_format=formatters
)
_srv = await loop.create_server(_handler, config.server['host'], config.server['port'])
print('Server started at http://%s:%s...' % (config.server['host'], config.server['port']))
return _srv, _handler, app