def set_redis_pool(self, redis_pool: Optional[Redis]):
if redis_pool:
if isinstance(redis_pool, (ConnectionsPool,)):
# If they've passed a raw pool then wrap it up in a Redis object.
# aioredis.create_redis_pool() normally does this for us.
redis_pool = Redis(redis_pool)
if not isinstance(redis_pool, (Redis,)):
raise InvalidRedisPool(
'Invalid Redis connection provided: {}. If unsure, use aioredis.create_redis_pool() to '
'create your redis connection.'.format(redis_pool)
)
if not isinstance(redis_pool._pool_or_conn, (ConnectionsPool,)):
raise InvalidRedisPool(
'The provided redis connection is backed by a single connection, rather than a '
'pool of connections. This will lead to lightbus deadlocks and is unsupported. '
'If unsure, use aioredis.create_redis_pool() to create your redis connection.'
)
self._redis_pool = redis_pool
python类create_redis_pool()的实例源码
def get_redis(self) -> Redis:
"""
Get the redis pool, if a pool is already initialised it's returned, else one is crated.
"""
async with self._create_pool_lock:
if self.redis is None:
self.redis = await self.create_redis_pool()
return self.redis
def redis(loop):
"""
yield fixture which creates a redis connection, and flushes redis before the test.
Note: redis is not flushed after the test both for performance and to allow later debugging.
"""
async def _create_redis():
r = await create_redis_pool(('localhost', 6379), loop=loop)
await r.flushall()
return r
async def _close(r):
r.close()
await r.wait_closed()
redis_ = loop.run_until_complete(_create_redis())
yield redis_
loop.run_until_complete(_close(redis_))
def connect():
global connection
connection = await aioredis.create_redis_pool((settings.REDIS_HOST, 6379), maxsize=10, password=settings.REDIS_PASSWORD)
def _connect(self):
try:
self._pool = await aioredis.create_redis_pool(
(self.host(), self.port()),
db=self.db(),
minsize=self.minsize(),
maxsize=self.maxsize()
)
self._ready.set()
except Exception:
log.exception("Exception connecting to database!")
raise
def aioredis_pool():
import sys
if sys.version_info >= (3, 5):
import aioredis
global _aioredis_pool
_aioredis_pool = yield from aioredis.create_redis_pool(
('localhost', 6379), minsize=2, maxsize=2)
_aioredis_pool.ring = ring.func.aioredis
return _aioredis_pool
else:
pytest.skip()
def create_redis_pool(_closable, loop):
"""Wrapper around aioredis.create_redis_pool."""
@asyncio.coroutine
def f(*args, **kw):
kw.setdefault('loop', loop)
redis = yield from aioredis.create_redis_pool(*args, **kw)
_closable(redis)
return redis
return f
def redis_pool(create_redis_pool, server, loop):
"""Returns RedisPool instance."""
pool = loop.run_until_complete(
create_redis_pool(server.tcp_address, loop=loop))
return pool
def new_redis_pool(create_redis_pool, server, loop):
"""Useful when you need multiple redis connections."""
def make_new():
redis = loop.run_until_complete(
create_redis_pool(server.tcp_address, loop=loop))
loop.run_until_complete(redis.flushall())
return redis
return make_new
def create_pool_lenient(settings: RedisSettings, loop: asyncio.AbstractEventLoop, *,
_retry: int=0) -> Redis:
"""
Create a new redis pool, retrying up to conn_retries times if the connection fails.
:param settings: RedisSettings instance
:param loop: event loop
:param _retry: retry attempt, this is set when the method calls itself recursively
"""
addr = settings.host, settings.port
try:
pool = await aioredis.create_redis_pool(
addr, loop=loop, db=settings.database, password=settings.password,
timeout=settings.conn_timeout
)
except (ConnectionError, OSError, aioredis.RedisError, asyncio.TimeoutError) as e:
if _retry < settings.conn_retries:
logger.warning('redis connection error %s %s, %d retries remaining...',
e.__class__.__name__, e, settings.conn_retries - _retry)
await asyncio.sleep(settings.conn_retry_delay)
else:
raise
else:
if _retry > 0:
logger.info('redis connection successful')
return pool
# recursively attempt to create the pool outside the except block to avoid
# "During handling of the above exception..." madness
return await create_pool_lenient(settings, loop, _retry=_retry + 1)
def create_redis_pool(self):
# defined here for easy mocking
return await create_pool_lenient(self.redis_settings, self.loop)
def get_redis_pool(self) -> Redis:
if self._redis_pool is None:
self._redis_pool = await aioredis.create_redis_pool(**self.connection_kwargs)
return self._redis_pool