conftest.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:arq 作者: samuelcolvin 项目源码 文件源码
def redis(loop):
    """
    yield fixture which creates a redis connection, and flushes redis before the test.

    Note: redis is not flushed after the test both for performance and to allow later debugging.
    """
    async def _create_redis():
        r = await create_redis_pool(('localhost', 6379), loop=loop)
        await r.flushall()
        return r

    async def _close(r):
        r.close()
        await r.wait_closed()

    redis_ = loop.run_until_complete(_create_redis())
    yield redis_
    loop.run_until_complete(_close(redis_))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号