InFluxAvroConsumer.java 文件源码

java
阅读 23 收藏 0 点赞 0 评论 0

项目:kafka-consumer 作者:
/**
   * To avoid too many try-catches this is separate..
   */
  public void runrun() {
    log.info("Waiting to consume data");
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    //loop through all messages in the stream
    while (it.hasNext()) {
      byte[] msg = it.next().message();
      if (msg.length < 2) {
        //valid messages are longer than 2 bytes as the first one is schema id
        //once upon time some libraries (pypro) would start with a short message to try if the kafka topic was alive. this is what topic polling refers to.
        log.info("ignoring short msg, assuming topic polling");
        continue;
      }
//      log.trace("Thread " + id + ":: " + Arrays.toString(msg));
      process(msg);
    }
    log.info("Shutting down consumer Thread: " + id);
  }
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号