何时使用 ConcurrentKafkaListenerContainerFactory?

我是kafka的新手,我浏览了文档,但我什么也听不懂。有人可以解释一下何时使用这个类吗?我已经使用了该类,但我看到在我当前的项目中被使用。请解释它的作用。ConcurrentKafkaListenerContainerFactoryKafkaconsumerConcurrentKafkaListenerContainerFactory


答案 1

Kafka 使用者不是线程安全的。所有网络 I/O 都发生在进行调用的应用程序的线程中。用户有责任确保多线程访问正确同步。不同步的访问将导致ConcurrentModificationException.

如果为使用者分配了多个分区以从中获取数据,它将尝试同时从所有这些分区使用,从而有效地为这些分区提供相同的使用优先级。但是,在某些情况下,使用者可能希望首先专注于以全速从分配分区的某些子集中提取,并且仅在这些分区具有很少或没有数据可使用时才开始提取其他分区。

斯普林卡夫卡

ConcurrentKafkaListenerContainerFactory用于为带注释的方法创建容器@KafkaListener

春天有两个卡夫卡MessageListenerContainer

KafkaMessageListenerContainer
ConcurrentMessageListenerContainer

接收来自单个线程上的所有主题或分区的所有消息。委托给一个或多个实例以提供多线程消耗。KafkaMessageListenerContainerConcurrentMessageListenerContainerKafkaMessageListenerContainer

使用 ConcurrentMessageListenerContainer

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

它具有并发属性。例如, container.setConcurrency(3) 创建三个实例。KafkaMessageListenerContainer

如果您提供了六个实例,并且并发性为3;每个容器有两个分区。对于五个 TopicPartition 实例,两个容器获取两个分区,第三个容器获取一个分区。如果并发性大于 TopicPartition 的数量,则会向下调整并发性,以便每个容器获得一个分区。TopicPartition

这是一个清晰的例子,里面有文档 在这里


答案 2

Kafka Consumer API 不是线程安全的。ConcurrentKafkaListenerContainerFactory api 提供了使用 Kafka Consumer API 的并发方式,同时设置了其他 kafka 使用者属性。


推荐