def redis_conn(loop):
"""
yield fixture which creates a redis connection, and flushes redis before the test.
Note: redis is not flushed after the test both for performance and to allow later debugging.
"""
async def _get_conn():
conn = await create_redis(('localhost', 6379), loop=loop)
await conn.flushall()
return conn
conn = loop.run_until_complete(_get_conn())
conn.loop = loop
yield conn
conn.close()
try:
loop.run_until_complete(conn.wait_closed())
except RuntimeError:
pass
评论列表
文章目录