public Object call() throws KettleException {
try {
long limit;
String strData = meta.getLimit();
limit = getLimit(strData);
if (limit > 0) {
step.logDebug("Collecting up to " + limit + " messages");
} else {
step.logDebug("Collecting unlimited messages");
}
while (data.streamIterator.hasNext() && !data.canceled && (limit <= 0 || data.processed < limit)) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = data.streamIterator.next();
messageReceived(messageAndMetadata.key(), messageAndMetadata.message());
++data.processed;
}
} catch (ConsumerTimeoutException cte) {
step.logDebug("Received a consumer timeout after " + data.processed + " messages");
if (!meta.isStopOnEmptyTopic()) {
// Because we're not set to stop on empty, this is an abnormal
// timeout
throw new KettleException("Unexpected consumer timeout!", cte);
}
}
// Notify that all messages were read successfully
data.consumer.commitOffsets();
step.setOutputDone();
return null;
}
KafkaConsumerCallable.java 文件源码
java
阅读 19
收藏 0
点赞 0
评论 0
项目:pentaho-kafka-consumer
作者:
评论列表
文章目录