避免apache kafka消费者中重复消息的有效策略如何从 Kafka 获取一次消息?

2022-08-31 14:56:50

我已经学习apache kafka一个月了。然而,我现在被困在一个点上。我的用例是,我有两个或多个消费者进程在不同的机器上运行。我运行了一些测试,其中我在kafka服务器中发布了10,000条消息。然后在处理这些消息时,我杀死了其中一个使用者进程并重新启动它。使用者在文件中写入已处理的消息。因此,在消费完成后,文件显示超过10k条消息。所以有些消息被复制了。

在消费者过程中,我禁用了自动提交。消费者手动提交批量偏移量。因此,例如,如果将100条消息写入文件,则消费者提交偏移量。当单个使用者进程正在运行时,它崩溃并以这种方式恢复重复。但是,当多个使用者正在运行时,其中一个使用者崩溃并恢复,它会将重复的消息写入文件。

是否有任何有效的策略来避免这些重复的消息?


答案 1

简短的回答是,不。

您正在寻找的是一次精确处理的。虽然它通常看起来可行,但永远不应该依赖它,因为总有警告。

即使为了尝试防止重复,您也需要使用简单的消费者。此方法的工作原理是针对每个使用者,当从某个分区使用消息时,将分区和已用消息的偏移量写入磁盘。当使用者在发生故障后重新启动时,从磁盘读取每个分区的最后消耗的偏移量。

但即使使用这种模式,消费者也无法保证在失败后不会重新处理消息。如果使用者使用消息,然后在将偏移量刷新到磁盘之前失败,该怎么办?如果在处理消息之前写入磁盘,那么如果在实际处理消息之前写入偏移量然后失败,该怎么办?即使您在每条消息之后向ZooKeeper提交偏移量,也会存在同样的问题。

但是,在某些情况下,恰好一次处理更容易实现,但仅适用于某些用例。这只需要将偏移量存储在与单位应用程序输出相同的位置。例如,如果您编写一个对消息进行计数的使用者,则通过将上次计数的偏移量与每个计数一起存储,可以保证偏移量与使用者的状态同时存储。当然,为了保证一次处理,这需要你只使用一条消息,并为每条消息更新一次状态,这对于大多数Kafka消费者应用程序来说是完全不切实际的。就其本质而言,Kafka 出于性能原因分批处理使用消息。

通常,如果您只是将其设计为幂等,那么您的时间将花费得更多,并且您的应用程序将更加可靠。


答案 2

这就是Kafka FAQ在“恰好一次”主题上要说的:

如何从 Kafka 获取一次消息?

恰好一次语义有两个部分:避免数据生产期间的重复和避免数据消耗期间的重复。

在数据生产过程中,有两种方法可以精确地获得一次语义:

  • 每个分区使用一个写入器,每次收到网络错误时,请检查该分区中的最后一条消息,以查看上次写入是否成功
  • 在消息中包含主键(UUID 或其他内容),并在使用者上进行重复数据删除。

如果您执行其中一项操作,则 Kafka 托管的日志将无重复。但是,没有重复的阅读也取决于消费者的一些合作。如果使用者定期检查其位置,则如果它失败并重新启动,它将从检查点位置重新启动。因此,如果数据输出和检查点不是以原子方式编写的,那么这里也有可能获得重复项。此问题特定于您的存储系统。例如,如果您使用的是数据库,则可以在事务中一起提交这些内容。LinkedIn编写的HDFS加载器Camus为Hadoop加载做了类似的事情。另一种不需要事务的替代方法是存储偏移量和加载的数据,并使用主题/分区/偏移量组合进行重复数据删除。

我认为有两个改进可以使这变得容易得多:

  • 生产者幂等性可以通过在服务器上选择性地集成对此的支持来自动完成,并且成本要低得多。
  • 现有的高级使用者不会公开很多更细粒度的偏移控制(例如,重置您的位置)。我们将很快为此而努力

推荐