test_api_redis.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:Enibar 作者: ENIB 项目源码 文件源码
def test_send_message(self):
        async def func():
            SUB = await aioredis.create_redis((os.environ.get(
                "REDIS_HOST",
                "127.0.0.1"
            ), 6379))
            res = await SUB.psubscribe("enibar-*")
            subscriber = res[0]

            api.redis.send_message('enibar-test', 'test')

            await subscriber.wait_message()
            reply = await subscriber.get_json()
            self.assertEqual(reply, (b'enibar-test', 'test'))

        task = asyncio.ensure_future(func())
        self.loop.run_until_complete(task)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号