def test_data_watcher_once(self):
update = threading.Event()
data = [True]
# Make it a non-existent path
self.path += 'f'
dwatcher = self.client.DataWatch(self.path)
@dwatcher
def changed(d, stat):
data.pop()
data.append(d)
update.set()
update.wait(10)
eq_(data, [None])
update.clear()
@raises(KazooException)
def test_it():
@dwatcher
def func(d, stat):
data.pop()
test_it()
评论列表
文章目录