KafkaIngesterConsumer.java 文件源码

java
阅读 25 收藏 0 点赞 0 评论 0

项目:chaperone 作者:
private void init() {
  // register kafka offset lag metrics, one Gauge is for per consumer level granularity
  MetricRegistry registry = Metrics.getRegistry();
  try {
    fetchedMsgCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-msgFetchRate");
    failedToIngestCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-failedToIngest");
    kafkaOffsetLagGauge =
        registry.register("kafkaIngesterConsumer." + this.getName() + "-kafkaOffsetLag", new JmxAttributeGauge(
            new ObjectName(maxLagMetricName), "Value"));
  } catch (MalformedObjectNameException | IllegalArgumentException e) {
    logger.error("Register failure for metrics of KafkaIngesterConsumer", e);
  }

  TopicFilter topicFilter = new Whitelist(AuditConfig.AUDIT_TOPIC_NAME);
  logger.info("{}: Topic filter is {}", getName(), AuditConfig.AUDIT_TOPIC_NAME);
  this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
  KafkaStream<byte[], byte[]> stream = consumer.createMessageStreamsByFilter(topicFilter, 1).get(0);
  iterator = stream.iterator();
  logger.info("KafkaIngesterConsumer thread {} is initialized successfully", getName());

  if (AuditConfig.INGESTER_ENABLE_DEDUP) {
    deduplicator =
        new Deduplicator(threadId, AuditConfig.INGESTER_REDIS_HOST, AuditConfig.INGESTER_REDIS_PORT,
            AuditConfig.INGESTER_REDIS_KEY_TTL_SEC, AuditConfig.INGESTER_DUP_HOST_PREFIX,
            AuditConfig.INGESTER_HOSTS_WITH_DUP);
    deduplicator.open();
  } else {
    deduplicator = null;
  }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号