/**
* 向kafka send
* @param value
*/
private void send(String value) {
// 对value的大小进行判定,当大于某个值认为该日志太大直接丢弃(防止影响到kafka)
if (value.length() > 10000) {
return;
}
final ProducerRecord<byte[], String> record = new ProducerRecord<>(this.topic, this.key, value);
LazySingletonProducer.getInstance(this.config).send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// TODO: 异常发生如何处理(直接停掉appender)
if (null != e) {
closed = true;
LogLog.error("kafka send error in appender", e);
// 发生异常,kafkaAppender 停止收集,向节点写入数据(监控系统会感知进行报警)
if (flag.get() == true) {
KafkaAppender.this.heartbeatStart();
zkRegister.write(Constants.SLASH + app + Constants.SLASH + host, NodeMode.EPHEMERAL,
String.valueOf(Constants.APP_APPENDER_STOP_KEY + Constants.SEMICOLON + System.currentTimeMillis()) + Constants.SEMICOLON + SysUtil.userDir);
flag.compareAndSet(true, false);
}
}
}
});
}
KafkaAppender.java 文件源码
java
阅读 27
收藏 0
点赞 0
评论 0
项目:SkyEye
作者:
评论列表
文章目录