def test_child_watcher_once(self):
update = threading.Event()
all_children = ['fred']
cwatch = self.client.ChildrenWatch(self.path)
@cwatch
def changed(children):
while all_children:
all_children.pop()
all_children.extend(children)
update.set()
update.wait(10)
eq_(all_children, [])
update.clear()
@raises(KazooException)
def test_it():
@cwatch
def changed_again(children):
update.set()
test_it()
评论列表
文章目录