java类kafka.consumer.ConsumerConfig的实例源码

SecSpout.java 文件源码 项目:CadalWorkspace 阅读 15 收藏 0 点赞 0 评论 0
public void KafkaInit(){
    Properties props = new Properties();
    props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
    props.put("group.id", "CadalSec");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("Read-common", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。

    Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get("Read-common").get(0);// 这里只有一个流,所以得get(0)就可以了。
    this.it = stream.iterator();

}
RecBookRecPageSpout.java 文件源码 项目:CadalWorkspace 阅读 15 收藏 0 点赞 0 评论 0
private void KafkaInit(){
    Properties props = new Properties();
    props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
    props.put("group.id", "RecRecPage");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("Rec-recPage", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。

    Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-recPage").get(0);// 这里只有一个流,所以得get(0)就可以了。
    this.it = stream.iterator();
}
RecTagRecPageSpout.java 文件源码 项目:CadalWorkspace 阅读 15 收藏 0 点赞 0 评论 0
private void KafkaInit(){
    Properties props = new Properties();
    props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
    props.put("group.id", "RecTagRecPage");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("Rec-recPageTagTag", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。

    Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-recPageTagTag").get(0);// 这里只有一个流,所以得get(0)就可以了。
    this.it = stream.iterator();
}
RecBookPersonalPageSpout.java 文件源码 项目:CadalWorkspace 阅读 16 收藏 0 点赞 0 评论 0
private void KafkaInit(){
    Properties props = new Properties();
    props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
    props.put("group.id", "RecPersonalPage");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("Rec-personalPage", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。

    Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-personalPage").get(0);// 这里只有一个流,所以得get(0)就可以了。
    this.it = stream.iterator();
}
RecTagBookSpout.java 文件源码 项目:CadalWorkspace 阅读 16 收藏 0 点赞 0 评论 0
private void KafkaInit(){
    Properties props = new Properties();
    props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
    props.put("group.id", "RecTagBook");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("Rec-recPageTagBook", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。

    Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-recPageTagBook").get(0);// 这里只有一个流,所以得get(0)就可以了。
    this.it = stream.iterator();
}
RecPersonalPageUserSpout.java 文件源码 项目:CadalWorkspace 阅读 15 收藏 0 点赞 0 评论 0
private void KafkaInit(){
    Properties props = new Properties();
    props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
    props.put("group.id", "RecPersonalPageUser");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("Rec-personalPageUser", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。

    Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-personalPageUser").get(0);// 这里只有一个流,所以得get(0)就可以了。
    this.it = stream.iterator();
}
RecBookHomePageSpout.java 文件源码 项目:CadalWorkspace 阅读 15 收藏 0 点赞 0 评论 0
private void KafkaInit(){
    Properties props = new Properties();
    props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
    props.put("group.id", "RecHomePage");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("Rec-homePage", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。

    Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-homePage").get(0);// 这里只有一个流,所以得get(0)就可以了。
    this.it = stream.iterator();
}
SearchClickSpout.java 文件源码 项目:CadalWorkspace 阅读 16 收藏 0 点赞 0 评论 0
private void KafkaInit(){
    Properties props = new Properties();
    props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
    props.put("group.id", "SearchClick");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("Search-click", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。

    Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get("Search-click").get(0);// 这里只有一个流,所以得get(0)就可以了。
    this.it = stream.iterator();
}
SearchTermSpout.java 文件源码 项目:CadalWorkspace 阅读 15 收藏 0 点赞 0 评论 0
private void KafkaInit(){
    Properties props = new Properties();
    props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
    props.put("group.id", "SearchTerm");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("Search-query", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。

    Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get("Search-query").get(0);// 这里只有一个流,所以得get(0)就可以了。
    this.it = stream.iterator();
}
PersonalReplySpout.java 文件源码 项目:CadalWorkspace 阅读 15 收藏 0 点赞 0 评论 0
private void KafkaInit(){
    Properties props = new Properties();
    props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
    props.put("group.id", "PersonalReply");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("Personal-reply", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。

    Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get("Personal-reply").get(0);// 这里只有一个流,所以得get(0)就可以了。
    this.it = stream.iterator();
}


问题


面经


文章

微信
公众号

扫码关注公众号