KafkaConsumerHelper.java 文件源码

java
阅读 18 收藏 0 点赞 0 评论 0

项目:fudanweixin 作者:
public  synchronized void start() {
    Config conf= Config.getInstance();
    Properties props = new Properties();
    props.put("zookeeper.connect",conf
            .get("kafka.servers"));
    props.put("group.id", conf.get("kafka.groupid"));
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
            props));
    String topics = "";
    for (String s : callbacks.keySet()) {
        topics += "," + s;
    }
    if (topics.length() > 0) {
        topics = topics.substring(1);
        Decoder<String> sd = new StringDecoder(null);
        List<KafkaStream<String, String>> streams = consumer
                .createMessageStreamsByFilter(
                        new Whitelist(topics),
                        Integer.parseInt(conf.get(
                                "kafka.threads")), sd, sd);
        if (streams != null) {
            ExecutorService tph = ThreadPoolHelper.getInstance()
                    .getSchPool();
            for (KafkaStream<String, String> stream : streams) {
                tph.submit(new CallbackThread(callbacks, stream));
            }
        }
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号