Kafka 使用者如何从多个分配的分区中消费

2022-09-02 01:47:24

tl;博士;我试图理解分配了多个分区的单个使用者如何处理访问分区的消耗记录。

例如:

  • 在移动到下一个分区之前完全处理单个分区。
  • 每次处理每个分区中的可用记录块。
  • 处理来自第一个可用分区的一批 N 条记录
  • 在轮循机制轮换中处理来自分区的一批 N 条记录

我找到了 或 分配者的配置,但这只决定了如何为使用者分配分区,而不是从分配到的分区中消耗它的方式。partition.assignment.strategyRangedRoundRobin

我开始挖掘KafkaConsumer源代码,#poll()将我引导到#pollForFetches()#pollForFetches(),然后引导我进入fetcher#fetchedRecords()fetcher#sendFetches()

这只会导致我尝试将整个 Fetcher 类放在一起,也许只是为时已晚,或者我只是没有深入挖掘,但我很难弄清楚消费者将如何处理多个分配的分区。

背景

处理由 Kafka Streams 支持的数据管道。

在此管道中的多个阶段,当记录由不同的 Kafka Streams 应用程序处理时,流将连接到由外部数据源提供的压缩主题源,这些数据源提供所需的数据,这些数据将在记录中增强,然后再继续处理的下一阶段。

在此过程中,有几个死信主题,其中记录无法与会增强记录的外部数据源匹配。这可能是因为数据尚不可用(事件或广告系列尚未上线),或者它是不良数据,永远不会匹配。

目标是在发布新的增强数据时重新发布死信主题的记录,以便我们可以匹配死信主题中以前不匹配的记录,以便更新它们并将其发送到下游进行其他处理。

记录在多次尝试中可能无法匹配,并且死信主题中可能有多个副本,因此我们只想重新处理现有记录(在应用程序启动时的最新偏移量之前)以及自上次运行应用程序以来(在以前保存的使用者组偏移量之后)发送到死信主题的记录。

它可以很好地工作,因为我的使用者筛选出应用程序启动后到达的任何记录,并且我的生产者通过将偏移量作为发布事务的一部分提交偏移量来管理我的使用者组偏移量。

但是我想确保我最终会从所有分区中消耗,因为我遇到了一个奇怪的边缘情况,其中未匹配的记录被重新处理并落在与死信主题中相同的分区中,只是被消费者过滤掉。虽然它没有获得新批次的记录进行处理,但也有一些分区尚未重新处理。

任何帮助理解单个使用者如何处理多个分配的分区将不胜感激。


答案 1

你走在正确的轨道上,因为大多数逻辑都在那里。Fetcher

首先,正如消费者Javadoc所提到的:

如果为使用者分配了多个分区以从中获取数据,它将尝试同时从所有这些分区使用,从而有效地为这些分区提供相同的使用优先级。

可以想象,在实践中,有几件事需要考虑。

  • 每次使用者尝试获取新记录时,它都会排除它已经有记录等待的分区(从以前的读取中)。已具有正在处理的读取请求的分区也被排除在外。

  • 提取记录时,使用者在提取请求中指定 和。代理使用这些数据分别确定总共和每个分区要返回的数据量。这同样适用于所有分区。fetch.max.bytesmax.partition.fetch.bytes

使用这两种方法,默认情况下,使用者尝试公平地使用所有分区。如果不是这种情况,请更改或通常会有所帮助。fetch.max.bytesmax.partition.fetch.bytes

如果要将某些分区优先于其他分区,则需要使用 pause()resume() 来手动控制消耗流。


答案 2

推荐