public synchronized void start() {
Config conf= Config.getInstance();
Properties props = new Properties();
props.put("zookeeper.connect",conf
.get("kafka.servers"));
props.put("group.id", conf.get("kafka.groupid"));
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
props));
String topics = "";
for (String s : callbacks.keySet()) {
topics += "," + s;
}
if (topics.length() > 0) {
topics = topics.substring(1);
Decoder<String> sd = new StringDecoder(null);
List<KafkaStream<String, String>> streams = consumer
.createMessageStreamsByFilter(
new Whitelist(topics),
Integer.parseInt(conf.get(
"kafka.threads")), sd, sd);
if (streams != null) {
ExecutorService tph = ThreadPoolHelper.getInstance()
.getSchPool();
for (KafkaStream<String, String> stream : streams) {
tph.submit(new CallbackThread(callbacks, stream));
}
}
}
}
KafkaConsumerHelper.java 文件源码
java
阅读 18
收藏 0
点赞 0
评论 0
项目:fudanweixin
作者:
评论列表
文章目录