def get_redis(address=None, loop=None, recreate=False) -> aioredis.Redis:
global _redis
address = address or settings.CONFIG['redis']
kwargs = utils.parse_redis_url(address)
kwargs['address'] = kwargs.pop('host'), kwargs.pop('port')
if not _redis or recreate:
_redis = await aioredis.create_reconnecting_redis(loop=loop, **kwargs)
return _redis
python类Redis()的实例源码
def set_redis_pool(self, redis_pool: Optional[Redis]):
if redis_pool:
if isinstance(redis_pool, (ConnectionsPool,)):
# If they've passed a raw pool then wrap it up in a Redis object.
# aioredis.create_redis_pool() normally does this for us.
redis_pool = Redis(redis_pool)
if not isinstance(redis_pool, (Redis,)):
raise InvalidRedisPool(
'Invalid Redis connection provided: {}. If unsure, use aioredis.create_redis_pool() to '
'create your redis connection.'.format(redis_pool)
)
if not isinstance(redis_pool._pool_or_conn, (ConnectionsPool,)):
raise InvalidRedisPool(
'The provided redis connection is backed by a single connection, rather than a '
'pool of connections. This will lead to lightbus deadlocks and is unsupported. '
'If unsure, use aioredis.create_redis_pool() to create your redis connection.'
)
self._redis_pool = redis_pool
def call_rpc(self, rpc_message: RpcMessage):
stream = '{}:stream'.format(rpc_message.api_name)
logger.debug(
LBullets(
L("Enqueuing message {} in Redis stream {}", Bold(rpc_message), Bold(stream)),
items=rpc_message.to_dict()
)
)
pool = await self.get_redis_pool()
with await pool as redis:
start_time = time.time()
# TODO: MAXLEN
await redis.xadd(stream=stream, fields=rpc_message.to_dict())
logger.info(L(
"Enqueued message {} in Redis in {} stream {}",
Bold(rpc_message), human_time(time.time() - start_time), Bold(stream)
))
def send_result(self, rpc_message: RpcMessage, result_message: ResultMessage, return_path: str):
logger.debug(L(
"Sending result {} into Redis using return path {}",
Bold(result_message), Bold(return_path)
))
redis_key = self._parse_return_path(return_path)
pool = await self.get_redis_pool()
with await pool as redis:
start_time = time.time()
p = redis.pipeline()
p.lpush(redis_key, redis_encode(result_message.result))
# TODO: Make result expiry configurable
p.expire(redis_key, timeout=60)
await p.execute()
logger.debug(L(
"? Sent result {} into Redis in {} using return path {}",
Bold(result_message), human_time(time.time() - start_time), Bold(return_path)
))
def receive_result(self, rpc_message: RpcMessage, return_path: str) -> ResultMessage:
logger.info(L("? Awaiting Redis result for RPC message: {}", Bold(rpc_message)))
redis_key = self._parse_return_path(return_path)
pool = await self.get_redis_pool()
with await pool as redis:
start_time = time.time()
# TODO: Make timeout configurable
_, result = await redis.blpop(redis_key, timeout=5)
result = redis_decode(result)
logger.info(L(
"? Received Redis result in {} for RPC message {}: {}",
human_time(time.time() - start_time), rpc_message, Bold(result)
))
return result
def get_redis(self) -> Redis:
"""
Get the redis pool, if a pool is already initialised it's returned, else one is crated.
"""
async with self._create_pool_lock:
if self.redis is None:
self.redis = await self.create_redis_pool()
return self.redis
def __init__(self, connection: aioredis.Redis):
self.connection = connection
def conn(func):
@functools.wraps(func)
async def wrapper(self, *args, _conn=None, **kwargs):
if _conn is None:
pool = await self._get_pool()
conn_context = await pool
with conn_context as _conn:
if not AIOREDIS_BEFORE_ONE:
_conn = aioredis.Redis(_conn)
return await func(self, *args, _conn=_conn, **kwargs)
return await func(self, *args, _conn=_conn, **kwargs)
return wrapper
def acquire_conn(self):
await self._get_pool()
conn = await self._pool.acquire()
if not AIOREDIS_BEFORE_ONE:
conn = aioredis.Redis(conn)
return conn
def __init__(self, queue=None):
self.queue = queue or list()
redis = mock.MagicMock(aioredis.Redis)
redis.rpush.side_effect = utils.make_coro(result=lambda key, item: self.queue.append(item))
redis.rpop.side_effect = utils.make_coro(result=lambda key: self.queue.pop())
redis.lpush.side_effect = utils.make_coro(result=lambda key, item: self.queue.insert(0, item))
redis.llen.side_effect = utils.make_coro(result=lambda key: len(self.queue))
super(MockQueue, self).__init__(redis, None)
def __init__(self, redis, key):
"""
:param aioredis.Redis redis:
"""
super(RedisQueue, self).__init__()
self.redis = redis
self.key = key
def __init__(self, *,
redis: Redis,
max_concurrent_tasks: int=50,
shutdown_delay: float=6,
timeout_seconds: int=60,
burst_mode: bool=True,
raise_task_exception: bool=False,
semaphore_timeout: float=60) -> None:
"""
:param redis: redis pool to get connection from to pop items from list, also used to optionally
re-enqueue pending jobs on termination
:param max_concurrent_tasks: maximum number of jobs which can be execute at the same time by the event loop
:param shutdown_delay: number of seconds to wait for tasks to finish
:param timeout_seconds: maximum duration of a job, after that the job will be cancelled by the event loop
:param burst_mode: break the iter loop as soon as no more jobs are available by adding an sentinel quit queue
:param raise_task_exception: whether or not to raise an exception which occurs in a processed task
"""
self.redis = redis
self.loop = redis._pool_or_conn._loop
self.max_concurrent_tasks = max_concurrent_tasks
self.task_semaphore = asyncio.Semaphore(value=max_concurrent_tasks, loop=self.loop)
self.shutdown_delay = max(shutdown_delay, 0.1)
self.timeout_seconds = timeout_seconds
self.burst_mode = burst_mode
self.raise_task_exception = raise_task_exception
self.pending_tasks: Set[asyncio.futures.Future] = set()
self.task_exception: Exception = None
self.semaphore_timeout = semaphore_timeout
self.jobs_complete, self.jobs_failed, self.jobs_timed_out = 0, 0, 0
self.running = False
self._finish_lock = asyncio.Lock(loop=self.loop)
def create_pool_lenient(settings: RedisSettings, loop: asyncio.AbstractEventLoop, *,
_retry: int=0) -> Redis:
"""
Create a new redis pool, retrying up to conn_retries times if the connection fails.
:param settings: RedisSettings instance
:param loop: event loop
:param _retry: retry attempt, this is set when the method calls itself recursively
"""
addr = settings.host, settings.port
try:
pool = await aioredis.create_redis_pool(
addr, loop=loop, db=settings.database, password=settings.password,
timeout=settings.conn_timeout
)
except (ConnectionError, OSError, aioredis.RedisError, asyncio.TimeoutError) as e:
if _retry < settings.conn_retries:
logger.warning('redis connection error %s %s, %d retries remaining...',
e.__class__.__name__, e, settings.conn_retries - _retry)
await asyncio.sleep(settings.conn_retry_delay)
else:
raise
else:
if _retry > 0:
logger.info('redis connection successful')
return pool
# recursively attempt to create the pool outside the except block to avoid
# "During handling of the above exception..." madness
return await create_pool_lenient(settings, loop, _retry=_retry + 1)
def __init__(self, *,
loop: asyncio.AbstractEventLoop=None,
redis_settings: RedisSettings=None,
existing_redis: Redis=None) -> None:
"""
:param loop: asyncio loop to use for the redis pool
:param redis_settings: connection settings to use for the pool
:param existing_redis: existing pool, if set no new pool is created, instead this one is used
"""
# the "or getattr(...) or" seems odd but it allows the mixin to work with subclasses which initialise
# loop or redis_settings before calling super().__init__ and don't pass those parameters through in kwargs.
self.loop = loop or getattr(self, 'loop', None) or asyncio.get_event_loop()
self.redis_settings = redis_settings or getattr(self, 'redis_settings', None) or RedisSettings()
self.redis = existing_redis
self._create_pool_lock = asyncio.Lock(loop=self.loop)
def get_redis_pool(self) -> Redis:
if self._redis_pool is None:
self._redis_pool = await aioredis.create_redis_pool(**self.connection_kwargs)
return self._redis_pool