@Override
public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder,
Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) {
if (!running) throw new IllegalStateException("Kafka client is no longer running");
if (numThreads < 1) return;
// Create the config for this consumer ...
final boolean autoCommit = true;
final boolean debug = logger.isDebugEnabled();
Properties props = new Properties();
props.putAll(this.consumerConfig);
props.put("group.id", groupId);
// Create the consumer and iterate over the streams and create a thread to process each one ...
ConsumerConnector connector = getOrCreateConnector(props);
connector.createMessageStreamsByFilter(topicFilter, numThreads, DEFAULT_DECODER, DEFAULT_DECODER)
.forEach(stream -> {
this.executor.get().execute(() -> {
final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
boolean success = false;
while (true) {
try {
while (running && iter.hasNext()) {
// Determine if we're still running after we've received a message ...
if ( running ) {
MessageAndMetadata<byte[], byte[]> msg = iter.next();
if (debug) {
logger.debug("Consuming next message on topic '{}', partition {}, offset {}",
msg.topic(), msg.partition(), msg.offset());
}
success = consumer.consume(msg.topic(), msg.partition(), msg.offset(),
keyDecoder.deserialize(msg.topic(),msg.key()),
messageDecoder.deserialize(msg.topic(),msg.message()));
logger.debug("Consume message: {}", success);
if (success && autoCommit) {
logger.debug("Committing offsets");
connector.commitOffsets();
}
}
}
} catch (ConsumerTimeoutException e) {
logger.debug("Consumer timed out and continuing");
// Keep going ...
}
}
});
});
}
KafkaMessageBus.java 文件源码
java
阅读 21
收藏 0
点赞 0
评论 0
项目:debezium-proto
作者:
评论列表
文章目录