def get_app():
redis = await aioredis.create_redis(('localhost', 6379,), db=1)
app = web.Application()
app['redis'] = redis
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates/'),
context_processors=(static_processor,))
app.router.add_route('GET', '/', handlers.index)
app.router.add_route('GET', '/login', handlers.login_task)
app.router.add_route('POST', '/login', handlers.login)
app.router.add_static('/static', 'static')
async def close_redis(app):
app['redis'].close()
app.on_shutdown.append(close_redis)
return app
python类create_redis()的实例源码
def run_bot():
if BotConfig.check():
loop = asyncio.get_event_loop()
logger.info('creating redis connection')
redis_conn = loop.run_until_complete(aioredis.create_redis(('localhost', 6379), encoding="utf-8"))
logger.info('adding signal handlers')
add_signal_handlers(loop, redis_conn)
logger.info('creating tasks: bot and background coros')
create_tasks(loop, redis_conn)
try:
logger.info('starting event loop ')
loop.run_forever()
finally:
loop.close()
def test_send_message(self):
async def func():
SUB = await aioredis.create_redis((os.environ.get(
"REDIS_HOST",
"127.0.0.1"
), 6379))
res = await SUB.psubscribe("enibar-*")
subscriber = res[0]
api.redis.send_message('enibar-test', 'test')
await subscriber.wait_message()
reply = await subscriber.get_json()
self.assertEqual(reply, (b'enibar-test', 'test'))
task = asyncio.ensure_future(func())
self.loop.run_until_complete(task)
def store_interface(config, interface, key=None, exist='SET_IF_NOT_EXIST'):
redis = yield from aioredis.create_redis((config.redis_host, config.redis_port))
pipe = redis.pipeline()
interface_id = interface['network_interface_id']
# Only store picked interface data if using default key (not fixed key from instance)
if not key:
key = KEY_ENI + interface_id
pipe.set(key=KEY_ENI + interface_id, value=pickle.dumps(interface), expire=int(config.redis_ttl))
# Store intermediate key lookups so that we can find metadata given only an IP address
if 'association' in interface:
pipe.set(key=KEY_IP + interface['association']['public_ip'], value=key, expire=int(config.redis_ttl), exist=exist)
for address in interface.get('private_ip_addresses', []):
pipe.set(key=KEY_IP + address['private_ip_address'], value=key, expire=int(config.redis_ttl), exist=exist)
yield from pipe.execute()
redis.close()
yield from redis.wait_closed()
def __init__(self, io_loop: asyncio.AbstractEventLoop = None):
super().__init__()
self.io_loop = io_loop or asyncio.get_event_loop()
self.sub_client = self.io_loop.run_until_complete(
aioredis.create_redis((config.get('REDIS', 'host', fallback='localhost'),
config.getint('REDIS', 'port', fallback=6379)),
db=config.getint('REDIS', 'db', fallback=1)))
self.redis_client = redis.StrictRedis(
host=config.get('REDIS', 'host', fallback='localhost'),
db=config.getint('REDIS', 'db', fallback=1), decode_responses=True)
self.initialized = False
self.sub_tasks = list()
self.sub_channels = list()
self.channel_router = dict()
self.crontab_router = defaultdict(dict)
self.datetime = None
self.time = None
self.loop_time = None
def setup(self):
self.redis = await aioredis.create_redis((config.REDIS_HOST, config.REDIS_PORT,), encoding='utf8')
self.__processed_status = await self.get_processed_ids()
self.client = aiohttp.ClientSession()
extension_manager = extension.ExtensionManager(namespace='ofensivaria.bot.commands',
invoke_on_load=True,
invoke_args=(self, self.redis, self.client))
commands = extension_manager.map(self.__extension_manager_callback)
self.commands = [obj for name, obj in sorted(commands)]
prepare_tasks = [c.prepare() for c in self.commands]
await asyncio.gather(*prepare_tasks)
self.__setup = True
def connect(self, endpoint: str, auth: Optional[str] = None) -> None: # type: ignore
endpoint = validate_endpoint(endpoint)
auth = validate_auth(auth)
self._auth = auth
# print('*** redis connecting to ', endpoint, flush=True)
if self._connected:
raise ConnectionError('Already connected.')
if not endpoint.startswith('redis://'):
raise ValueError('Expected endpoint to begin with "redis://".'
'Got: {!r}'.format(endpoint))
host, port = endpoint.replace('redis://', '').split(':') # todo: handle exception
self._subscriber = await aioredis.create_redis((host, port))
self._publisher = await aioredis.create_redis((host, port))
if auth:
await self._subscriber.auth(auth)
await self._publisher.auth(auth)
else:
print('*** WARNING: Redis connection has no password.')
self._connected = True
def redis_conn(loop):
"""
yield fixture which creates a redis connection, and flushes redis before the test.
Note: redis is not flushed after the test both for performance and to allow later debugging.
"""
async def _get_conn():
conn = await create_redis(('localhost', 6379), loop=loop)
await conn.flushall()
return conn
conn = loop.run_until_complete(_get_conn())
conn.loop = loop
yield conn
conn.close()
try:
loop.run_until_complete(conn.wait_closed())
except RuntimeError:
pass
def prepare(self):
"""
Called by the backend to prepare SocketShark (i.e. initialize Redis
connection and the receiver class)
"""
redis_receiver = Receiver(loop=asyncio.get_event_loop())
redis_settings = self.config['REDIS']
try:
self.redis = await aioredis.create_redis((
redis_settings['host'], redis_settings['port']))
except (OSError, aioredis.RedisError):
self.log.exception('could not connect to redis')
raise
# Some features (e.g. pinging) don't work on old Redis versions.
info = await self.redis.info('server')
version_info = info['server']['redis_version'].split('.')
major, minor = int(version_info[0]), int(version_info[1])
if not (major > 3 or major == 3 and minor >= 2):
msg = 'Redis version must be at least 3.2'
self.log.exception(msg, version_info=version_info)
raise Exception(msg)
self._redis_connection_handler_task = asyncio.ensure_future(
self._redis_connection_handler())
self.service_receiver = ServiceReceiver(self, redis_receiver)
def __init__(self,
dsn: str = "redis://127.0.0.1:6379/0",
prefix: str = "aiotasks",
loop=None):
super().__init__(loop=loop, prefix=prefix)
_, password, host, port, db = parse_dsn(dsn,
default_port=6379,
default_db=0)
db = int(db)
# if not port:
# port = 6379
#
# port = int(port)
# try:
# db = int(db)
#
# if not db:
# db = 0
# except ValueError:
# db = 0
self._redis_pub = self._loop_subscribers.run_until_complete(
aioredis.create_redis(address=(host, port),
db=db,
password=password,
loop=self._loop_subscribers))
self._redis_sub = self._loop_subscribers.run_until_complete(
aioredis.create_redis(address=(host, port),
db=db,
password=password,
loop=self._loop_subscribers))
def __init__(self,
dsn: str = "redis://127.0.0.1:6379/0",
prefix: str = "aiotasks",
concurrency: int = 5,
loop=None):
super().__init__(loop=loop, prefix=prefix, concurrency=concurrency)
_, password, host, port, db = parse_dsn(dsn,
default_port=6379,
default_db=0)
db = int(db)
# if not port:
# port = 6379
#
# port = int(port)
# try:
# db = int(db)
# if not db:
# db = 0
# except ValueError:
# db = 0
self._redis_consumer = self._loop_delay. \
run_until_complete(aioredis.create_redis(address=(host, port),
db=db,
password=password,
loop=self._loop_delay))
self._redis_poller = self._loop_delay. \
run_until_complete(aioredis.create_redis(address=(host, port),
db=db,
password=password,
loop=self._loop_delay))
def on_connect(self):
"""Initialize Redis connection when bot loads"""
self.redis = await aioredis.create_redis(('localhost', 6379))
def get_async_redis(self, loop=None):
"""Creates an asynchronous Redis connection.
Parameters
----------
loop = Optional[asyncio.AbstractEventLoop]
The loop used for the asynchronous Redis connection.
"""
if self.loop is not None and loop is None:
loop = self.loop
return await aioredis.create_redis(
'redis://{}:{}'.format(cache_config['HOST'], cache_config['PORT']),
db=cache_config['DB'], password=cache_config['PASSWORD'], loop=loop)
def redis(variables, loop):
coro = create_redis(
(variables['redis']['host'], variables['redis']['port']),
db=variables['redis']['db'],
loop=loop
)
return loop.run_until_complete(coro)
def get_async_redis():
""" initialize an asyncronous redis connection
"""
global ASYNCREDIS
if ASYNCREDIS is None or ASYNCREDIS.closed: # pragma: no branch
address = REDIS_PUBSUB["address"]
db = REDIS_PUBSUB["db"]
password = REDIS_PUBSUB["password"]
ASYNCREDIS = yield from aioredis.create_redis(address, db=db, password=password)
return ASYNCREDIS
def redis_async(request, event_loop):
conn = event_loop.run_until_complete(aioredis.create_redis(('localhost', 6379), encoding="utf-8", db=10))
def redis_async_cleanup():
conn.close()
request.addfinalizer(redis_async_cleanup)
return conn
def redis(variables, loop):
coro = create_redis(
(variables['redis']['host'], variables['redis']['port']),
db=variables['redis']['db'],
loop=loop
)
return loop.run_until_complete(coro)
def _publish(self, data):
if self.pub is None:
self.pub = await aioredis.create_redis((self.host, self.port),
db=self.db)
return await self.pub.publish(self.channel, pickle.dumps(data))
def _listen(self):
if self.sub is None:
self.sub = await aioredis.create_redis((self.host, self.port),
db=self.db)
self.ch = (await self.sub.subscribe(self.channel))[0]
while True:
return await self.ch.get()
def install_redis_handle(app):
global SUB
while True:
SUB = await aioredis.create_redis((settings.REDIS_HOST, 6379), password=settings.REDIS_PASSWORD)
res = await SUB.psubscribe("enibar-*")
subscriber = res[0]
while await subscriber.wait_message():
reply = await subscriber.get_json()
await app.redis_handle(reply[0].decode(), reply[1])
await asyncio.sleep(1)
def connect_databases(self):
# connect to postgres
self.pgpool = await asyncpg.create_pool(**self.cfg['db']['postgres'])
# connect to redis
self.redis = await aioredis.create_redis(
(self.cfg['db']['redis'], 6379), loop=self.loop
)
def init_connections(sanic, loop):
"""Bind the database and Redis client to Sanic's event loop."""
global redis
redis_server = environ.get('REDIS_SERVER', None)
if redis_server:
redis = await create_redis((redis_server, 6379), encoding='utf-8', loop=loop)
seed()
def store_instance(config, instance):
redis = yield from aioredis.create_redis((config.redis_host, config.redis_port))
pipe = redis.pipeline()
instance_id = instance['instance_id']
# Store pickled instance data keyed off instance ID
pipe.set(key=KEY_I + instance_id, value=pickle.dumps(instance), expire=int(config.redis_ttl))
# Store intermediate key lookups so that we can find an instance given only its IP address
for interface in instance.get('network_interfaces', []):
yield from store_interface(config, interface, KEY_I + instance_id, None)
yield from pipe.execute()
redis.close()
yield from redis.wait_closed()
def create():
return Database(await aioredis.create_redis(("redis", 6379)))
def test_aioredis(i, loop):
start = time.time()
redis = await aioredis.create_redis((HOST, 6379), loop=loop)
val = None
for i in range(i):
val = await redis.keys('*')
print(time.time() - start)
redis.close()
await redis.wait_closed()
return val
def query(self, query_type: str, **kwargs):
sub_client = None
channel_name1, channel_name2 = None, None
try:
sub_client = await aioredis.create_redis(
(config.get('REDIS', 'host', fallback='localhost'),
config.getint('REDIS', 'port', fallback=6379)),
db=config.getint('REDIS', 'db', fallback=1))
request_id = self.next_id()
kwargs['RequestID'] = request_id
channel_name1 = self.__trade_response_format.format('OnRspQry' + query_type, request_id)
channel_name2 = self.__trade_response_format.format('OnRspError', request_id)
ch1, ch2 = await sub_client.psubscribe(channel_name1, channel_name2)
cb = self.io_loop.create_future()
tasks = [
asyncio.ensure_future(self.query_reader(ch1, cb), loop=self.io_loop),
asyncio.ensure_future(self.query_reader(ch2, cb), loop=self.io_loop),
]
self.redis_client.publish(self.__request_format.format('ReqQry' + query_type), json.dumps(kwargs))
rst = await asyncio.wait_for(cb, HANDLER_TIME_OUT, loop=self.io_loop)
await sub_client.punsubscribe(channel_name1, channel_name2)
sub_client.close()
await asyncio.wait(tasks, loop=self.io_loop)
return rst
except Exception as e:
logger.error('%s failed: %s', query_type, repr(e), exc_info=True)
if sub_client and sub_client.in_pubsub and channel_name1:
await sub_client.unsubscribe(channel_name1, channel_name2)
sub_client.close()
return None
def SubscribeMarketData(self, inst_ids: list):
sub_client = None
channel_name1, channel_name2 = None, None
try:
sub_client = await aioredis.create_redis(
(config.get('REDIS', 'host', fallback='localhost'),
config.getint('REDIS', 'port', fallback=6379)),
db=config.getint('REDIS', 'db', fallback=1))
channel_name1 = self.__market_response_format.format('OnRspSubMarketData', 0)
channel_name2 = self.__market_response_format.format('OnRspError', 0)
ch1, ch2 = await sub_client.psubscribe(channel_name1, channel_name2)
cb = self.io_loop.create_future()
tasks = [
asyncio.ensure_future(self.query_reader(ch1, cb), loop=self.io_loop),
asyncio.ensure_future(self.query_reader(ch2, cb), loop=self.io_loop),
]
self.redis_client.publish(self.__request_format.format('SubscribeMarketData'), json.dumps(inst_ids))
rst = await asyncio.wait_for(cb, HANDLER_TIME_OUT, loop=self.io_loop)
await sub_client.punsubscribe(channel_name1, channel_name2)
sub_client.close()
await asyncio.wait(tasks, loop=self.io_loop)
return rst
except Exception as e:
logger.error('SubscribeMarketData failed: %s', repr(e), exc_info=True)
if sub_client and sub_client.in_pubsub and channel_name1:
await sub_client.unsubscribe(channel_name1, channel_name2)
sub_client.close()
return None
def UnSubscribeMarketData(self, inst_ids: list):
sub_client = None
channel_name1, channel_name2 = None, None
try:
sub_client = await aioredis.create_redis(
(config.get('REDIS', 'host', fallback='localhost'),
config.getint('REDIS', 'port', fallback=6379)),
db=config.getint('REDIS', 'db', fallback=1))
channel_name1 = self.__market_response_format.format('OnRspUnSubMarketData', 0)
channel_name2 = self.__market_response_format.format('OnRspError', 0)
ch1, ch2 = await sub_client.psubscribe(channel_name1, channel_name2)
cb = self.io_loop.create_future()
tasks = [
asyncio.ensure_future(self.query_reader(ch1, cb), loop=self.io_loop),
asyncio.ensure_future(self.query_reader(ch2, cb), loop=self.io_loop),
]
self.redis_client.publish(self.__request_format.format('UnSubscribeMarketData'), json.dumps(inst_ids))
rst = await asyncio.wait_for(cb, HANDLER_TIME_OUT, loop=self.io_loop)
await sub_client.punsubscribe(channel_name1, channel_name2)
sub_client.close()
await asyncio.wait(tasks, loop=self.io_loop)
return rst
except Exception as e:
logger.error('SubscribeMarketData failed: %s', repr(e), exc_info=True)
if sub_client and sub_client.in_pubsub and channel_name1:
await sub_client.unsubscribe(channel_name1, channel_name2)
sub_client.close()
return None
def _initialize_redis_client(self):
return await aioredis.create_redis(
(config["redis"]["host"], config["redis"]["port"]),
loop=asyncio.get_event_loop()
)
def _create_connect():
global _connection
if not _connection:
_connection = await aioredis.create_redis(
(options.options.redis_host, options.options.redis_port),
db=options.options.redis_index
)
return _connection