def test_send_message(self):
async def func():
SUB = await aioredis.create_redis((os.environ.get(
"REDIS_HOST",
"127.0.0.1"
), 6379))
res = await SUB.psubscribe("enibar-*")
subscriber = res[0]
api.redis.send_message('enibar-test', 'test')
await subscriber.wait_message()
reply = await subscriber.get_json()
self.assertEqual(reply, (b'enibar-test', 'test'))
task = asyncio.ensure_future(func())
self.loop.run_until_complete(task)
评论列表
文章目录