public void start() {
if (!started.compareAndSet(false, true)) {
throw new DefectException("Can only be started once!");
}
brokersById.addListener((MapChangeListener<String, KafkaBroker>) change -> version.incrementAndGet());
zkClient.waitUntilExists("/brokers", TimeUnit.SECONDS, 10);
executor.submit(() -> {
zkClient.subscribeChildChanges("/brokers/ids", (parentPath, currentChilds) ->
updateBrokers(ImmutableSet.copyOf(currentChilds)));
updateBrokers(ImmutableSet.copyOf(zkClient.getChildren("/brokers/ids")));
});
}
KafkaBrokersTracker.java 文件源码
java
阅读 19
收藏 0
点赞 0
评论 0
项目:kafka-visualizer
作者:
评论列表
文章目录