KafkaDistributed.java 文件源码

java
阅读 26 收藏 0 点赞 0 评论 0

项目:jlogstash-input-plugin 作者:
public void run() {
    try {
        while(true){
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext()) {
                String m = null;
                try {
                    m = new String(it.next().message(),
                            this.kafkaInput.encoding);
                    Map<String, Object> event = this.decoder
                            .decode(m);
                    if(zkDistributed==null){
                        this.kafkaInput.process(event);
                    }else{
                        zkDistributed.route(event);
                    }
                } catch (Exception e) {
                    logger.error("process event:{} failed:{}",m,ExceptionUtil.getErrorMessage(e));
                }
            }
        }
    } catch (Exception t) {
        logger.error("kakfa Consumer fetch is error:{}",ExceptionUtil.getErrorMessage(t));
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号