ConsumerThread.java 文件源码

java
阅读 22 收藏 0 点赞 0 评论 0

项目:java-kafka-client-libs 作者:
@Override
public void run() {
    LOG.info( "Consuming thread started" );

    try {
        ConsumerIterator<byte[], byte[]> it = _stream.iterator();
        while ( it.hasNext() ) {
            long start = System.currentTimeMillis();
            byte[] message = it.next().message();
            LOG.debug( "message received: {}", ( new String( message ) ) );

            _handler.onMessage( message );

            long time = System.currentTimeMillis() - start;
            KruxStdLib.STATSD.time( "message_received." + _topic, time );

        }
    } catch ( Exception e ) {
        if ( e instanceof InterruptedException ) {
            LOG.warn( "Consumer group threads interrupted, shutting down" );
        } else {
            LOG.error( "no longer fetching messages", e );
        }
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号