test_watch.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:aioetcd3 作者: gaopeiliang 项目源码 文件源码
def test_watch_exception(self):
        f1 = asyncio.get_event_loop().create_future()
        f2 = asyncio.get_event_loop().create_future()
        async def watch_1():
            i = 0
            async with self.client.watch_scope('/foo') as response:
                f1.set_result(None)
                with self.assertRaises(RpcError):
                    async for event in response:
                        i = i + 1
                        if i == 1:
                            self.assertEqual(event.type, EVENT_TYPE_CREATE)
                            self.assertEqual(event.key, b'/foo')
                            self.assertEqual(event.value, b'foo')
                            f2.set_result(None)
                        elif i == 2:
                            raise ValueError("Not raised")
        f3 = asyncio.get_event_loop().create_future()
        f4 = asyncio.get_event_loop().create_future()
        async def watch_2():
            i = 0
            async with self.client.watch_scope('/foo', always_reconnect=True) as response:
                f3.set_result(None)
                async for event in response:
                    i = i + 1
                    if i == 1:
                        self.assertEqual(event.type, EVENT_TYPE_CREATE)
                        self.assertEqual(event.key, b'/foo')
                        self.assertEqual(event.value, b'foo')
                        f4.set_result(None)
                    elif i == 2:
                        self.assertEqual(event.type, EVENT_TYPE_MODIFY)
                        self.assertEqual(event.key, b'/foo')
                        self.assertEqual(event.value, b'foo1')
                    elif i == 3:
                        self.assertEqual(event.type, EVENT_TYPE_DELETE)
                        self.assertEqual(event.key, b'/foo')
                        # delete event has no value
                        # self.assertEqual(event.value, b'foo1')
                        break
        t1 = asyncio.ensure_future(watch_1())
        t2 = asyncio.ensure_future(watch_2())
        await f1
        await f3
        await self.client.put('/foo', 'foo')
        await f2
        await f4
        fake_endpoints = 'ipv4:///127.0.0.1:49999'
        self.client.update_server_list(fake_endpoints)
        await asyncio.sleep(2)
        self.client.update_server_list(self.endpoints)
        await self.client.put('/foo', 'foo1')
        await self.client.delete('/foo')
        await t1
        await t2
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号