def redis(loop):
"""
yield fixture which creates a redis connection, and flushes redis before the test.
Note: redis is not flushed after the test both for performance and to allow later debugging.
"""
async def _create_redis():
r = await create_redis_pool(('localhost', 6379), loop=loop)
await r.flushall()
return r
async def _close(r):
r.close()
await r.wait_closed()
redis_ = loop.run_until_complete(_create_redis())
yield redis_
loop.run_until_complete(_close(redis_))
评论列表
文章目录