private void init() {
// register kafka offset lag metrics, one Gauge is for per consumer level granularity
MetricRegistry registry = Metrics.getRegistry();
try {
fetchedMsgCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-msgFetchRate");
failedToIngestCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-failedToIngest");
kafkaOffsetLagGauge =
registry.register("kafkaIngesterConsumer." + this.getName() + "-kafkaOffsetLag", new JmxAttributeGauge(
new ObjectName(maxLagMetricName), "Value"));
} catch (MalformedObjectNameException | IllegalArgumentException e) {
logger.error("Register failure for metrics of KafkaIngesterConsumer", e);
}
TopicFilter topicFilter = new Whitelist(AuditConfig.AUDIT_TOPIC_NAME);
logger.info("{}: Topic filter is {}", getName(), AuditConfig.AUDIT_TOPIC_NAME);
this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
KafkaStream<byte[], byte[]> stream = consumer.createMessageStreamsByFilter(topicFilter, 1).get(0);
iterator = stream.iterator();
logger.info("KafkaIngesterConsumer thread {} is initialized successfully", getName());
if (AuditConfig.INGESTER_ENABLE_DEDUP) {
deduplicator =
new Deduplicator(threadId, AuditConfig.INGESTER_REDIS_HOST, AuditConfig.INGESTER_REDIS_PORT,
AuditConfig.INGESTER_REDIS_KEY_TTL_SEC, AuditConfig.INGESTER_DUP_HOST_PREFIX,
AuditConfig.INGESTER_HOSTS_WITH_DUP);
deduplicator.open();
} else {
deduplicator = null;
}
}
KafkaIngesterConsumer.java 文件源码
java
阅读 25
收藏 0
点赞 0
评论 0
项目:chaperone
作者:
评论列表
文章目录