java类kafka.consumer.ConsumerIterator的实例源码

KafkaDestinationMultiPartitionPipelineRunIT.java 文件源码 项目:datacollector 阅读 15 收藏 0 点赞 0 评论 0
@Override
protected int getRecordsInTarget() {
  int expectedRecordsInTarget = 0;
  for(KafkaStream<byte[], byte[]> kafkaStream : kafkaStreams) {
    ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
    try {
      while (it.hasNext()) {
        expectedRecordsInTarget++;
        it.next();
      }
    } catch (kafka.consumer.ConsumerTimeoutException e) {
      //no-op
    }
  }
  return expectedRecordsInTarget;
}
DataBrokerTest.java 文件源码 项目:netty-kafka-producer 阅读 19 收藏 0 点赞 0 评论 0
@Test
public void test_sendMessage() throws Exception {
    createTopic(topic);

    CountDownLatch latch = new CountDownLatch(1);
    ProducerProperties properties = new ProducerProperties();
    properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);

    DataKafkaBroker dataChannel = new DataKafkaBroker("localhost", START_PORT, 0, topic,new NioEventLoopGroup(), properties);
    dataChannel.connect().sync();

    dataChannel.send(freeLaterBuffer("1".getBytes()), 0, freeLaterBuffer(TEST_MESSAGE.getBytes()));

    final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
    final ConsumerIterator<byte[], byte[]> messages = stream.iterator();

    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE));

    dataChannel.disconnect();
}
ListenerTest.java 文件源码 项目:netty-kafka-producer 阅读 19 收藏 0 点赞 0 评论 0
@Test
public void test_no_acks_send_message() throws Exception {

    String topic = "test_no_acks_send_message";

    createTopic(topic, 1);
    ProducerProperties properties = new ProducerProperties();
    properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
    properties.override(ProducerProperties.DATA_ACK, Acknowledgment.WAIT_FOR_NO_ONE);
    KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
    producer.connect().sync();
    KafkaTopic kafkaTopic = producer.topic();

    kafkaTopic.send(null, freeLaterBuffer(TEST_MESSAGE.getBytes()));

    final List<KafkaStream<byte[], byte[]>> consume = consume(topic);
    final KafkaStream<byte[], byte[]> stream = consume.get(0);
    final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
    Assert.assertThat(TEST_MESSAGE, is(new String(messages.next().message())));
    producer.disconnect().sync();
}
KafkaTopicSingleBrokerTest.java 文件源码 项目:netty-kafka-producer 阅读 15 收藏 0 点赞 0 评论 0
@Test
public void test_producer() throws Exception {

    String topic = "test";
    ProducerProperties properties = new ProducerProperties();
    properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
    createTopic(topic);

    KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
    producer.connect().sync();
    KafkaTopic kafkaTopic = producer.topic();

    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "01").getBytes()));
    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "02").getBytes()));
    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "03").getBytes()));

    final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
    final ConsumerIterator<byte[], byte[]> messages = stream.iterator();

    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "01"));
    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "02"));
    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "03"));
    producer.disconnect().sync();
}
ConsumerWorker.java 文件源码 项目:yuzhouwan 阅读 18 收藏 0 点赞 0 评论 0
@Override
public void run() {
    ConsumerIterator<byte[], byte[]> iter = kafkaStream.iterator();
    MessageAndMetadata<byte[], byte[]> msg;
    int total = 0, fail = 0, success = 0;
    long start = System.currentTimeMillis();
    while (iter.hasNext()) {
        try {
            msg = iter.next();
            _log.info("Thread {}: {}", threadNum, new String(msg.message(), "utf-8"));
            _log.info("partition: {}, offset: {}", msg.partition(), msg.offset());
            success++;
        } catch (Exception e) {
            _log.error("{}", e);
            fail++;
        }
        _log.info("Count [fail/success/total]: [{}/{}/{}], Time: {}s", fail, success, ++total,
                (System.currentTimeMillis() - start) / 1000);
    }
}
KafkaLogAppenderTest.java 文件源码 项目:java-kafka 阅读 17 收藏 0 点赞 0 评论 0
@Test
public void testKafkaLogAppender() {
    Properties consumerProps = new Properties();
    consumerProps.put("zookeeper.connect", zookeeper);
    consumerProps.put("group.id", "kafka-log-appender-test");
    consumerProps.put("auto.offset.reset", "smallest");
    consumerProps.put("schema.registry.url", schemaRegistry);

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(topic, 1);

    ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps))
            .createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps)))
            .get(topic).get(0).iterator();

    String testMessage = "I am a test message";
    logger.info(testMessage);

    MessageAndMetadata<String, Object> messageAndMetadata = iterator.next();
    GenericRecord logLine = (GenericRecord) messageAndMetadata.message();
    assertEquals(logLine.get("line").toString(), testMessage);
    assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId);
    assertNotNull(logLine.get("source"));
    assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1);
    assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2);
}
KafkaSpoutTest.java 文件源码 项目:LogRTA 阅读 21 收藏 0 点赞 0 评论 0
public void activate() {         
    consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
    Map<String,Integer> topickMap = new HashMap<String, Integer>();  
    topickMap.put(topic, 1);  

    System.out.println("*********Results********topic:"+topic);  

    Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
    KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
    ConsumerIterator<byte[],byte[]> it =stream.iterator();   
    while(it.hasNext()){  
         String value =new String(it.next().message());
         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd日 HH:mm:ss SSS");
         Date curDate = new Date(System.currentTimeMillis());       
         String str = formatter.format(curDate);   
         System.out.println("storm接收到来自kafka的消息--->" + value);
         collector.emit(new Values(value,1,str), value);
    }  
}
ConsumerThread.java 文件源码 项目:java-kafka-client-libs 阅读 22 收藏 0 点赞 0 评论 0
@Override
public void run() {
    LOG.info( "Consuming thread started" );

    try {
        ConsumerIterator<byte[], byte[]> it = _stream.iterator();
        while ( it.hasNext() ) {
            long start = System.currentTimeMillis();
            byte[] message = it.next().message();
            LOG.debug( "message received: {}", ( new String( message ) ) );

            _handler.onMessage( message );

            long time = System.currentTimeMillis() - start;
            KruxStdLib.STATSD.time( "message_received." + _topic, time );

        }
    } catch ( Exception e ) {
        if ( e instanceof InterruptedException ) {
            LOG.warn( "Consumer group threads interrupted, shutting down" );
        } else {
            LOG.error( "no longer fetching messages", e );
        }
    }
}
SimpleHLConsumer.java 文件源码 项目:java.study 阅读 19 收藏 0 点赞 0 评论 0
public void testConsumer() throws Exception {
    String fileName = "logX.txt";
    BufferedWriter out = new BufferedWriter(new FileWriter(fileName));

    Map<String, Integer> topicCount = new HashMap<String, Integer>();
    topicCount.put(topic, 1);

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
    for (final KafkaStream stream : streams) {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            String msg = new String(it.next().message(), Charset.forName("UTF-8"));
            System.out.println("Message from Single Topic: " + msg);
            out.write(msg, 0, msg.length());
            out.write('\n');
            out.flush();
        }
    }
    if (consumer != null) {
        consumer.shutdown();
    }
    if (null != out) {
        out.close();
    }
}
KafkaSpout.java 文件源码 项目:monasca-thresh 阅读 17 收藏 0 点赞 0 评论 0
@Override
public void run() {
  while (this.shouldContinue) {
    final ConsumerIterator<byte[], byte[]> it = streams.get(0).iterator();
    if (it.hasNext()) {
      final byte[] message = it.next().message();
      synchronized (this) {
        this.message = message;
        // Wake up getMessage() if it is waiting
        if (this.waiting) {
          notify();
        }
        while (this.message != null && this.shouldContinue)
          try {
            wait();
          } catch (InterruptedException e) {
            logger.info("Wait interrupted", e);
          }
      }
    }
  }
  logger.info("readerThread {} exited", this.readerThread.getName());
  this.readerThread = null;
}


问题


面经


文章

微信
公众号

扫码关注公众号