KafkaConsumer Java API subscribe() vs assign()

2022-09-02 21:27:23

我是Kafka Java API的新手,我正在努力使用来自特定Kafka主题的记录。

我知道我可以使用方法从主题开始轮询记录。Kafka还提供了方法,如果我想从主题的选定分区开始轮询记录。subscribe()assign()

我想知道这是否是两者之间的唯一区别?


答案 1

是的,因为组中的每个使用者将动态分配给订阅方法中提供的主题列表的分区,并且每个分区都可以由该组中的一个使用者线程使用。这是通过平衡使用者组中所有成员之间的分区来实现的,以便将每个分区仅分配给组中的一个使用者来实现。subscribegroup.id

assign将手动将分区列表分配给此使用者。并且此方法不使用使用者的组管理功能(其中不需要group.id)

主要区别在于会通过动态分区分配和消费者组协调来松动控制器assign(Collection)

使用者也可以使用 assign(Collection) 手动分配特定分区(类似于较旧的“简单”使用者)。在这种情况下,将禁用动态分区分配和使用者组协调。

订阅

public void subscribe(java.util.Collection<java.lang.String> topics)

订阅方法 订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则将其视为unsubscribe().

作为组管理的一部分,使用者将跟踪属于特定组的使用者列表,如果以下事件之一触发, 将触发重新平衡操作 -

Number of partitions change for any of the subscribed list of topics
Topic is created or deleted
An existing member of the consumer group dies
A new member is added to an existing consumer group via the join API

分配

public void assign(java.util.Collection<TopicPartition> partitions)

分配方法手动将分区列表分配给此使用者。如果给定的主题分区列表为空,则将其视为与取消订阅()相同的方式。

通过此方法手动分配主题不使用使用者的组管理功能。因此,当组成员身份或集群和主题元数据发生更改时,不会触发重新平衡操作。


答案 2

我想专门为没有.此属性没有默认值(给定没有框架恶作剧 - KafkaClient lib + Java)。这不是官方的,但他们通常被称为免费消费者免费使用者不订阅主题,因此需要分配主题分区。group.id

如上所述,自动分区分配,重新平衡,偏移持久性,分区排他性,消费者心跳和故障检测/活跃性(所有与消费者群体一起赠送的东西)的概念与这些免费消费者一起被抛出窗外。因此,由客户端(您)来跟踪应用程序与kafka相关的任何状态,其中包括跟踪偏移量(例如Map)。这是因为自由消费者不会将其偏移量提交到Kafka,并且通常使用您自己的存储机制。


推荐