def test_watch_exception(self):
f1 = asyncio.get_event_loop().create_future()
f2 = asyncio.get_event_loop().create_future()
async def watch_1():
i = 0
async with self.client.watch_scope('/foo') as response:
f1.set_result(None)
with self.assertRaises(RpcError):
async for event in response:
i = i + 1
if i == 1:
self.assertEqual(event.type, EVENT_TYPE_CREATE)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo')
f2.set_result(None)
elif i == 2:
raise ValueError("Not raised")
f3 = asyncio.get_event_loop().create_future()
f4 = asyncio.get_event_loop().create_future()
async def watch_2():
i = 0
async with self.client.watch_scope('/foo', always_reconnect=True) as response:
f3.set_result(None)
async for event in response:
i = i + 1
if i == 1:
self.assertEqual(event.type, EVENT_TYPE_CREATE)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo')
f4.set_result(None)
elif i == 2:
self.assertEqual(event.type, EVENT_TYPE_MODIFY)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo1')
elif i == 3:
self.assertEqual(event.type, EVENT_TYPE_DELETE)
self.assertEqual(event.key, b'/foo')
# delete event has no value
# self.assertEqual(event.value, b'foo1')
break
t1 = asyncio.ensure_future(watch_1())
t2 = asyncio.ensure_future(watch_2())
await f1
await f3
await self.client.put('/foo', 'foo')
await f2
await f4
fake_endpoints = 'ipv4:///127.0.0.1:49999'
self.client.update_server_list(fake_endpoints)
await asyncio.sleep(2)
self.client.update_server_list(self.endpoints)
await self.client.put('/foo', 'foo1')
await self.client.delete('/foo')
await t1
await t2
评论列表
文章目录