public void start() {
if (!started.compareAndSet(false, true)) {
throw new DefectException("Can only be started once!");
}
topicsByName.addListener((MapChangeListener<String, KafkaTopic>) change -> version.incrementAndGet());
zkClient.waitUntilExists("/brokers", TimeUnit.SECONDS, 10);
executor.submit(() -> {
zkClient.subscribeChildChanges("/brokers/topics", (parentPath, currentChilds) ->
updateTopics(ImmutableSet.copyOf(currentChilds)));
updateTopics(ImmutableSet.copyOf(zkClient.getChildren("/brokers/topics")));
});
}
KafkaTopicsTracker.java 文件源码
java
阅读 19
收藏 0
点赞 0
评论 0
项目:kafka-visualizer
作者:
评论列表
文章目录