KafkaMessageBus.java 文件源码

java
阅读 21 收藏 0 点赞 0 评论 0

项目:debezium-proto 作者:
@Override
public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder,
                                             Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) {
    if (!running) throw new IllegalStateException("Kafka client is no longer running");
    if (numThreads < 1) return;

    // Create the config for this consumer ...
    final boolean autoCommit = true;
    final boolean debug = logger.isDebugEnabled();
    Properties props = new Properties();
    props.putAll(this.consumerConfig);
    props.put("group.id", groupId);

    // Create the consumer and iterate over the streams and create a thread to process each one ...
    ConsumerConnector connector = getOrCreateConnector(props);
    connector.createMessageStreamsByFilter(topicFilter, numThreads, DEFAULT_DECODER, DEFAULT_DECODER)
             .forEach(stream -> {
                 this.executor.get().execute(() -> {
                     final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
                     boolean success = false;
                     while (true) {
                         try {
                             while (running && iter.hasNext()) {
                                 // Determine if we're still running after we've received a message ...
                                 if ( running ) {
                                     MessageAndMetadata<byte[], byte[]> msg = iter.next();
                                     if (debug) {
                                         logger.debug("Consuming next message on topic '{}', partition {}, offset {}",
                                                      msg.topic(), msg.partition(), msg.offset());
                                     }
                                     success = consumer.consume(msg.topic(), msg.partition(), msg.offset(),
                                                                keyDecoder.deserialize(msg.topic(),msg.key()),
                                                                messageDecoder.deserialize(msg.topic(),msg.message()));
                                     logger.debug("Consume message: {}", success);
                                     if (success && autoCommit) {
                                         logger.debug("Committing offsets");
                                         connector.commitOffsets();
                                     }
                                 }
                             }
                         } catch (ConsumerTimeoutException e) {
                             logger.debug("Consumer timed out and continuing");
                             // Keep going ...
                         }
                     }
                 });
             });
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号