Kafka.java 文件源码

java
阅读 25 收藏 0 点赞 0 评论 0

项目:jlogstash-input-plugin 作者:
public void run() {
    try {
        while(true){
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext()) {
                String m = null;
                try {
                    m = new String(it.next().message(),
                            this.kafkaInput.encoding);
                    Map<String, Object> event = this.decoder
                            .decode(m);
                    if (event!=null&&event.size()>0){
                        this.kafkaInput.process(event);
                    } 
                } catch (Exception e) {
                    logger.error("process event:{} failed:{}",m,e.getCause());
                }
            }
        }
    } catch (Exception t) {
        logger.error("kakfa Consumer fetch is error:{}",t.getCause());
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号