kafka消费者自动提交如何工作?

我正在读这个

自动提交 提交偏移的最简单方法是允许使用者为您执行此操作。如果配置 enable.auto.commit=true,则使用者将每五秒提交一次客户端从 poll() 接收到的最大偏移量。五秒间隔是默认设置,可通过设置 auto.commit.interval.ms 进行控制。就像消费者中的其他所有内容一样,自动提交由轮询循环驱动。每当您轮询时,使用者都会检查是否是时候提交,如果是,它将提交在上次轮询中返回的偏移量。

也许问题是我的英语不好,但我不完全理解这个描述。

假设我使用默认间隔的自动提交 - 5秒,每7秒进行一次轮询。在这种情况下,每5秒或每7秒发生一次提交?

你能澄清一下是否每3秒进行一次民意调查的行为吗?提交会每 5 秒或每 6 秒发生一次吗?
我读过这个

自动提交:可以将 auto.commit 设置为 true,并使用以毫秒为单位的值设置 auto.commit.interval.ms 属性。启用此功能后,Kafka 使用者将提交为响应其 poll() 调用而收到的最后一条消息的偏移量。poll() 调用在设置 auto.commit.interval.ms 在后台发出。

这与答案相矛盾。

你能详细解释一下这些东西吗?

假设我有这样的图表:

0 秒 - 轮询
4 秒 - 轮询
8 秒 - 轮询

何时提交抵消,何时提交?


答案 1

在每次轮询中都会调用自动提交检查,并检查经过的时间是否大于配置的时间。如果是这样,则提交偏移量。

如果提交间隔为 5 秒,轮询在 7 秒内发生,则提交将仅在 7 秒后发生。


答案 2

它会在轮询完成后尝试尽快自动提交。您可以查看使用者协调器的源代码,它具有在类级别定义的一组本地字段,以了解是否启用了自动提交,间隔是什么以及执行自动提交的下一个截止日期是什么。

https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L625

轮询中执行调用以执行存储 https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L279

例如,轮询每7秒执行一次,自动提交设置为5:

0 - 投票,+ 将截止日期设置为第 5 秒

7 - 投票 + 由于截止时间提交,更新截止时间到 7+5=12

14 - 投票 + 由于截止时间而提交,更新截止时间到 12+5=17

但是,如果轮询设置为每 3 秒一次,并且自动提交设置为 5:

0 - 投票,+ 将截止日期设置为第 5 秒

3 - 轮询,无提交

6 - 投票 + 由于截止时间而提交,更新截止时间为 6+5=11