java类kafka.consumer.Consumer的实例源码

AvroOneM2MDataSubscribe.java 文件源码 项目:SDA 阅读 20 收藏 0 点赞 0 评论 0
public void collect() throws Exception{
    Properties properties = new Properties();

    //class name을 user_id, grup_id로 사용함
    properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
    properties.put("group.id",group_id);
    properties.put("zookeeper.session.timeout.ms", "6000");
    properties.put("zookeeper.sync.time.ms", "2000");
    properties.put("auto.commit.enable", "true");
    properties.put("auto.commit.interval.ms", "5000");
    properties.put("fetch.message.max.bytes", "31457280");      // 30MB     
    properties.put("auto.offset.reset", "smallest");
    //properties.put("auto.offset.reset", "largest");

    final ConsumerConnector consumer = 
            Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

    topicCountMap.put(TOPIC, NUM_THREADS);

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =    consumer.createMessageStreams(topicCountMap);

    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);

    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

    log.debug("NUM_THREADS : "+NUM_THREADS);

    //for (final KafkaStream<byte[], byte[]> stream : streams) {
    //  executor.execute(new ConsumerT(stream));
    //}

    for (int m = 0; m < NUM_THREADS; m++) {
        executor.execute(new ConsumerT(streams.get(m)));
    }

}
KafkaMqCollect.java 文件源码 项目:light_drtc 阅读 21 收藏 0 点赞 0 评论 0
public void init(){
    Properties props = new Properties();
       props.put("zookeeper.connect", Constants.kfZkServers);
       props.put("group.id", Constants.kfGroupId);
       props.put("auto.offset.reset", Constants.kfAutoOffsetReset);
       props.put("zookeeper.session.timeout.ms", "4000");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");
       props.put("serializer.class", "kafka.serializer.StringEncoder");
       ConsumerConfig config = new ConsumerConfig(props);
       consumer = Consumer.createJavaConsumerConnector(config);
}
KafkaConsumer.java 文件源码 项目:change-data-capture 阅读 33 收藏 0 点赞 0 评论 0
public KafkaConsumer(final String topic, final String zkConnect, final String groupId, Decoder<Val> decoder){


     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                new ConsumerConfig(getConsumerConfig(zkConnect, groupId)));

     Map<String, Integer> topicCountMap = new HashMap(){{
            put(topic, new Integer(1));
     }};
     Map<String, List<KafkaStream<byte[], Val>>> consumerMap = consumer.createMessageStreams(topicCountMap, new DefaultDecoder(null), decoder);
     stream = consumerMap.get(topic).get(0);
}
OldConsumer.java 文件源码 项目:kafka-monitor 阅读 15 收藏 0 点赞 0 评论 0
public OldConsumer(String topic, Properties consumerProperties) {
  _connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
  Map<String, Integer> topicCountMap = new HashMap<>();
  topicCountMap.put(topic, 1);
  Map<String, List<KafkaStream<String, String>>> kafkaStreams = _connector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
  _iter = kafkaStreams.get(topic).get(0).iterator();
}
DemoHighLevelConsumer.java 文件源码 项目:KafkaExample 阅读 15 收藏 0 点赞 0 评论 0
public static void main(String[] args) {
    args = new String[] { "zookeeper0:2181/kafka", "topic1", "group2", "consumer1" };
    if (args == null || args.length != 4) {
        System.err.println("Usage:\n\tjava -jar kafka_consumer.jar ${zookeeper_list} ${topic_name} ${group_name} ${consumer_id}");
        System.exit(1);
    }
    String zk = args[0];
    String topic = args[1];
    String groupid = args[2];
    String consumerid = args[3];
    Properties props = new Properties();
    props.put("zookeeper.connect", zk);
    props.put("group.id", groupid);
    props.put("client.id", "test");
    props.put("consumer.id", consumerid);
    props.put("auto.offset.reset", "largest");
    props.put("auto.commit.enable", "false");
    props.put("auto.commit.interval.ms", "60000");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream1 = consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> interator = stream1.iterator();
    while (interator.hasNext()) {
        MessageAndMetadata<byte[], byte[]> messageAndMetadata = interator.next();
        String message = String.format(
                "Topic:%s, GroupID:%s, Consumer ID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
                messageAndMetadata.topic(), groupid, consumerid, messageAndMetadata.partition(),
                messageAndMetadata.offset(), new String(messageAndMetadata.key()),
                new String(messageAndMetadata.message()));
        System.out.println(message);
        consumerConnector.commitOffsets();
    }
}
KafkaEventConsumer.java 文件源码 项目:EventStreamAnalytics 阅读 16 收藏 0 点赞 0 评论 0
public KafkaEventConsumer(ActorSystem system) {
    logger.debug("Initilizing Consumer");
    hazelcastEventActor = system.actorOf(Props.create(HazelcastEventActor.class), "hazelcastEventHandler");
    mangoDBEventActor = system.actorOf(Props.create(MongoDBEventActor.class), "mongoDbEventHandler");
    Properties props = new Properties();
    props.put("zookeeper.connect", "localhost:2181");
    props.put("group.id", "eventProcessor");
    props.put("client.id", "workerEventProcessor");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "smallest");
    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    logger.debug("Intialized Consumer");
}
KafkaDemoClient.java 文件源码 项目:thingsboard 阅读 26 收藏 0 点赞 0 评论 0
private static ConsumerIterator<String, String> buildConsumer(String topic) {
    Map<String, Integer> topicCountMap = new HashMap<>();
    topicCountMap.put(topic, 1);
    ConsumerConfig consumerConfig = new ConsumerConfig(consumerProperties());
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
    KafkaStream<String, String> stream = consumers.get(topic).get(0);
    return stream.iterator();
}
KafkaConsumer.java 文件源码 项目:kclient 阅读 29 收藏 0 点赞 0 评论 0
protected void initKafka() {
    if (handler == null) {
        log.error("Exectuor can't be null!");
        throw new RuntimeException("Exectuor can't be null!");
    }

    log.info("Consumer properties:" + properties);
    ConsumerConfig config = new ConsumerConfig(properties);

    isAutoCommitOffset = config.autoCommitEnable();
    log.info("Auto commit: " + isAutoCommitOffset);

    consumerConnector = Consumer.createJavaConsumerConnector(config);

    Map<String, Integer> topics = new HashMap<String, Integer>();
    topics.put(topic, streamNum);
    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(
            new VerifiableProperties());
    Map<String, List<KafkaStream<String, String>>> streamsMap = consumerConnector
            .createMessageStreams(topics, keyDecoder, valueDecoder);

    streams = streamsMap.get(topic);
    log.info("Streams:" + streams);

    if (streams == null || streams.isEmpty()) {
        log.error("Streams are empty.");
        throw new IllegalArgumentException("Streams are empty.");
    }

    streamThreadPool = Executors.newFixedThreadPool(streamNum);
}
KafkaConsumerTestBase.java 文件源码 项目:flink 阅读 15 收藏 0 点赞 0 评论 0
/**
 * Read topic to list, only using Kafka code.
 */
private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
    // we request only one stream per consumer instance. Kafka will make sure that each consumer group
    // will see each message only once.
    Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
    if (streams.size() != 1) {
        throw new RuntimeException("Expected only one message stream but got " + streams.size());
    }
    List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
    if (kafkaStreams == null) {
        throw new RuntimeException("Requested stream not available. Available streams: " + streams.toString());
    }
    if (kafkaStreams.size() != 1) {
        throw new RuntimeException("Requested 1 stream from Kafka, bot got " + kafkaStreams.size() + " streams");
    }
    LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
    ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();

    List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
    int read = 0;
    while (iteratorToRead.hasNext()) {
        read++;
        result.add(iteratorToRead.next());
        if (read == stopAfter) {
            LOG.info("Read " + read + " elements");
            return result;
        }
    }
    return result;
}
KafkaConsumerTestBase.java 文件源码 项目:flink 阅读 17 收藏 0 点赞 0 评论 0
/**
 * Read topic to list, only using Kafka code.
 */
private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
    // we request only one stream per consumer instance. Kafka will make sure that each consumer group
    // will see each message only once.
    Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
    if (streams.size() != 1) {
        throw new RuntimeException("Expected only one message stream but got "+streams.size());
    }
    List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
    if (kafkaStreams == null) {
        throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
    }
    if (kafkaStreams.size() != 1) {
        throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
    }
    LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
    ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();

    List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
    int read = 0;
    while(iteratorToRead.hasNext()) {
        read++;
        result.add(iteratorToRead.next());
        if (read == stopAfter) {
            LOG.info("Read "+read+" elements");
            return result;
        }
    }
    return result;
}


问题


面经


文章

微信
公众号

扫码关注公众号