@Override
public void run() {
LOG.info( "Consuming thread started" );
try {
ConsumerIterator<byte[], byte[]> it = _stream.iterator();
while ( it.hasNext() ) {
long start = System.currentTimeMillis();
byte[] message = it.next().message();
LOG.debug( "message received: {}", ( new String( message ) ) );
_handler.onMessage( message );
long time = System.currentTimeMillis() - start;
KruxStdLib.STATSD.time( "message_received." + _topic, time );
}
} catch ( Exception e ) {
if ( e instanceof InterruptedException ) {
LOG.warn( "Consumer group threads interrupted, shutting down" );
} else {
LOG.error( "no longer fetching messages", e );
}
}
}
ConsumerThread.java 文件源码
java
阅读 22
收藏 0
点赞 0
评论 0
项目:java-kafka-client-libs
作者:
评论列表
文章目录