Spring @KafkaListener和并发
我正在使用spring boot + spring @KafkaListener。我期望的行为是:我的kafka侦听器读取10个线程中的消息。这样,如果线程之一挂起,则其他消息将继续读取和处理消息。
我定义了
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory)
{
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
return factory;
}
和spring boot config:
spring.kafka.listener.concurrency=10
我看到所有配置都能正常工作,我在jmx中看到了10个线程:
但是然后我做了这样的测试:
@KafkaListener(topics = {
"${topic.name}" }, clientIdPrefix = "${kafka.client.id.prefix}", idIsGroup = false, id = "${kafka.listener.name}", containerFactory = "kafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record)
{
if(record.getVersion() < 3) {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else
System.out.println("It works!");
}
如果版本<3,则挂起,否则-工作。我发送了1,2和3版本的3条消息。我希望版本1和2的消息会挂起,但是版本3将在侦听器时处理。但是不幸的是,版本3的消息在开始处理之前会等待消息1和2。
也许我的期望不对,这是kafka听众的正确行为。请帮助我处理kafka并发,为什么会这样呢?
-
不是那样的。您至少需要与使用者一样多的分区(由concurrencyspring容器控制)。
此外,一次(一个组)一个使用者(一次使用)只能从一个分区中进行使用,因此,即使您增加了分区,“卡住”的使用者后面相同分区中的记录也不会被其他使用者接收。
-
Spring Security,表单登录和并发会话
2021-01-30 关注 0 浏览139 1答案
-
Spring Security并发控制
2021-01-30 关注 0 浏览74 1答案
-
Spring Security 4阻止并发登录不起作用
2021-02-02 关注 0 浏览131 1答案
-
Spring Security中的最大并发用户数
2021-01-30 关注 0 浏览85 1答案
-
Spring如何处理线程并发问题?
2021-09-17 关注 0 浏览108 1答案
-
Spring如何处理线程并发问题?
2021-10-23 关注 0 浏览130 1答案
-
Spring 如何处理线程并发问题?
2022-01-18 关注 0 浏览86 1答案
-
Spring并发会话控件不起作用,用户可以多次登录
2021-01-29 关注 0 浏览129 1答案
-
Spring AMQP侦听器容器中的并发如何实现?
2021-01-30 关注 0 浏览138 1答案
-
Spring,JPA和Hibernate-如何在没有并发问题的情况下增加计数器
2021-02-01 关注 0 浏览94 1答案