def set_redis_pool(self, redis_pool: Optional[Redis]):
if redis_pool:
if isinstance(redis_pool, (ConnectionsPool,)):
# If they've passed a raw pool then wrap it up in a Redis object.
# aioredis.create_redis_pool() normally does this for us.
redis_pool = Redis(redis_pool)
if not isinstance(redis_pool, (Redis,)):
raise InvalidRedisPool(
'Invalid Redis connection provided: {}. If unsure, use aioredis.create_redis_pool() to '
'create your redis connection.'.format(redis_pool)
)
if not isinstance(redis_pool._pool_or_conn, (ConnectionsPool,)):
raise InvalidRedisPool(
'The provided redis connection is backed by a single connection, rather than a '
'pool of connections. This will lead to lightbus deadlocks and is unsupported. '
'If unsure, use aioredis.create_redis_pool() to create your redis connection.'
)
self._redis_pool = redis_pool
评论列表
文章目录