def test_read_only(self):
from kazoo.exceptions import NotReadOnlyCallError
from kazoo.protocol.states import KeeperState
client = self.client
states = []
ev = threading.Event()
@client.add_listener
def listen(state):
states.append(state)
if client.client_state == KeeperState.CONNECTED_RO:
ev.set()
try:
self.cluster[1].stop()
self.cluster[2].stop()
ev.wait(6)
eq_(ev.is_set(), True)
eq_(client.client_state, KeeperState.CONNECTED_RO)
# Test read only command
eq_(client.get_children('/'), [])
# Test error with write command
@raises(NotReadOnlyCallError)
def testit():
client.create('/fred')
testit()
# Wait for a ping
time.sleep(15)
finally:
client.remove_listener(listen)
self.cluster[1].run()
self.cluster[2].run()
评论列表
文章目录