是什么决定了卡夫卡消费者的偏移量?

我对卡夫卡比较陌生。我已经做了一些实验,但是关于消费者抵消,我不清楚一些事情。根据我到目前为止的理解,当消费者开始时,它将开始读取的偏移量由配置设置决定(如果我错了,请纠正我)。auto.offset.reset

现在假设例如,主题中有10条消息(偏移量为0到9),并且消费者碰巧在它下降之前(或在我杀死消费者之前)消费了其中的5条消息。然后假设我重新启动该消费者流程。我的问题是:

  1. 如果 设置为 ,它是否总是从偏移量 0 开始消耗?auto.offset.resetearliest

  2. 如果 设置为 ,它将从偏移量 5 开始消耗吗?auto.offset.resetlatest

  3. 关于这种情况的行为总是确定性的吗?

如果我的问题中有不清楚,请不要犹豫。


答案 1

它比你描述的要复杂一些。
仅当您的使用者组在某个位置没有提交有效的偏移量时,auto.offset.reset 配置才会启动(现在支持的 2 个偏移量存储是 Kafka 和 Zookeeper),这也取决于您使用的使用者类型。

如果您使用高级java消费者,那么想象以下场景:

  1. 您在使用者组中有一个使用者,该使用者已消费了 5 条消息并死亡。下次你启动这个消费者时,它甚至不会使用该配置,而是从它死亡的地方继续,因为它只会从偏移存储中获取存储的偏移量(我提到的Kafka或ZK)。group1auto.offset.reset

  2. 您在主题中有消息(如您所描述的那样),并且您在新的使用者组中启动了使用者。没有在任何地方存储偏移量,这次配置将决定是从主题的开头()开始还是从主题的结尾(group2auto.offset.resetearliestlatest)

影响偏移值所对应和配置的另一件事是日志保留策略。假设您有一个主题,其中保留期配置为 1 小时。您生成了 5 条消息,然后一小时后又发布了 5 条消息。偏移量仍将与上一个示例中的偏移量相同,但无法保持,因为 Kafka 已经删除了这些消息,因此最早可用的偏移量将是 。earliestlatestlatestearliest05

上面提到的所有内容都与之无关,每次运行它时,它都会决定从哪里开始使用配置。SimpleConsumerauto.offset.reset

如果使用早于 0.9 的 Kafka 版本,则必须将 替换为 ,。earliestlatestsmallestlargest


答案 2

只是一个更新:从Kafka 0.9及以后,Kafka正在使用新的Java版本的消费者,并且auto.offset.reset参数名称已更改;从手册中:

当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除),该怎么办:

最早:自动将偏移量重置为最早的偏移量

最新:自动将偏移重置为最新偏移

none:如果找不到使用者组的先前偏移量,则向使用者引发异常

其他任何内容:向使用者抛出异常。

在检查了接受的答案后,我花了一些时间找到它,所以我认为发布它可能对社区有用。


推荐