private SimpleConsumer createSimpleConsumer(Integer brokerId) {
try {
String brokerInfo = zkClient.readData(ZkUtils.BrokerIdsPath() + "/" + brokerId, true);
if (brokerInfo == null) {
log.error("Broker clientId %d does not exist", brokerId);
return null;
}
Map<String, Object> map = Resources.jsonMapper.readValue(
brokerInfo, new TypeReference<Map<String, Object>>() {
}
);
String host = (String) map.get("host");
Integer port = (Integer) map.get("port");
return new SimpleConsumer(host, port, 10000, 100000, "KafkaConsumerInfos");
} catch (Exception e) {
log.error(e, "Could not parse broker[%d] info", brokerId);
return null;
}
}
KafkaInfos.java 文件源码
java
阅读 24
收藏 0
点赞 0
评论 0
项目:DCMonitor
作者:
评论列表
文章目录