KafkaConsumerCallable.java 文件源码

java
阅读 19 收藏 0 点赞 0 评论 0

项目:pentaho-kafka-consumer 作者:
public Object call() throws KettleException {
    try {
        long limit;
        String strData = meta.getLimit();

        limit = getLimit(strData);
        if (limit > 0) {
            step.logDebug("Collecting up to " + limit + " messages");
        } else {
            step.logDebug("Collecting unlimited messages");
        }
        while (data.streamIterator.hasNext() && !data.canceled && (limit <= 0 || data.processed < limit)) {
            MessageAndMetadata<byte[], byte[]> messageAndMetadata = data.streamIterator.next();
            messageReceived(messageAndMetadata.key(), messageAndMetadata.message());
            ++data.processed;
        }
    } catch (ConsumerTimeoutException cte) {
        step.logDebug("Received a consumer timeout after " + data.processed + " messages");
        if (!meta.isStopOnEmptyTopic()) {
            // Because we're not set to stop on empty, this is an abnormal
            // timeout
            throw new KettleException("Unexpected consumer timeout!", cte);
        }
    }
    // Notify that all messages were read successfully
    data.consumer.commitOffsets();
    step.setOutputDone();
    return null;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号