KafkaAppender.java 文件源码

java
阅读 27 收藏 0 点赞 0 评论 0

项目:SkyEye 作者:
/**
 * 向kafka send
 * @param value
 */
private void send(String value) {
    // 对value的大小进行判定,当大于某个值认为该日志太大直接丢弃(防止影响到kafka)
    if (value.length() > 10000) {
        return;
    }

    final ProducerRecord<byte[], String> record = new ProducerRecord<>(this.topic, this.key, value);
    LazySingletonProducer.getInstance(this.config).send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            // TODO: 异常发生如何处理(直接停掉appender)
            if (null != e) {
                closed = true;
                LogLog.error("kafka send error in appender", e);
                // 发生异常,kafkaAppender 停止收集,向节点写入数据(监控系统会感知进行报警)
                if (flag.get() == true) {
                    KafkaAppender.this.heartbeatStart();
                    zkRegister.write(Constants.SLASH + app + Constants.SLASH + host, NodeMode.EPHEMERAL,
                            String.valueOf(Constants.APP_APPENDER_STOP_KEY + Constants.SEMICOLON + System.currentTimeMillis()) + Constants.SEMICOLON + SysUtil.userDir);
                    flag.compareAndSet(true, false);
                }
            }
        }
    });
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号