如何挑选卡夫卡 transaction.id

2022-09-01 20:29:25

我想知道我能不能得到一些帮助来理解Kafka中的交易,特别是我如何使用 transaction.id。下面是上下文:

  1. 我的 Kafka 应用程序遵循以下模式:使用来自输入主题的消息,处理,发布到输出主题。
  2. 我没有使用Kafka Streams API。
  3. 我在单个消费者组中有多个消费者,每个消费者都在自己的轮询线程中。
  4. 有一个线程池,其中包含执行消息处理并发布到输出主题的工作线程。目前,每个线程都有自己的生产者实例。
  5. 我正在使用已发布的事务 API 来确保消耗偏移量的更新和发布到输出主题的原子方式发生

到目前为止,我的假设包括:

  1. 如果我的进程在事务中间崩溃,那么该事务中的任何内容都不会发布,也不会移动任何消耗偏移量。因此,在重新启动时,我只需从原始消耗偏移量再次启动事务即可。
  2. 对于制片人 transaction.id 来说,重要的是它是独一无二的。因此,我可以在启动时生成一个基于时间戳的id

然后我读了下面的博客:https://www.confluent.io/blog/transactions-apache-kafka/。特别是在“如何选择事务 ID”一节中,它似乎暗示我需要保证每个输入分区都有一个生产者实例。它说:“正确屏蔽僵尸的关键是确保读 - 进程 - 写周期中的输入主题和分区对于给定的 transactional.id 始终相同。它进一步引用了问题示例如下:“例如,在分布式流处理应用程序中,假设主题分区tp0最初由 transactional.id T0处理。如果在稍后的某个时候,它可以映射到另一个具有 transactional.id T1 的生产者,则 T0 和 T1 之间将没有围栏。因此,来自tp0的消息可能会被重新处理,这违反了恰好一次处理保证。

我不太明白为什么会这样。在我看来,只要事务是原子的,我就不应该关心哪个生产者处理来自任何分区的消息。我已经为此挣扎了一天,我想知道是否有人可以告诉我我在这里错过了什么。那么,为什么我不能将工作分配给具有任何 transaction.id 设置的任何生产者实例,只要它是唯一的。为什么他们说,如果您这样做,消息可能会通过事务提供的围栏泄漏。


答案 1

考虑消费者组人口不断变化(新消费者上线或下线)的情况,或者故障场景导致使用者组内主题分区分配的重新平衡。

现在假设之前已为使用者分配了分区 。这个消费者很高兴地离开并处理消息,发布新消息等(标准的消费 - 转换 - 发布模式)。发生重新平衡事件,导致被毫不客气地(始终希望使用该单词)从 中撤销并分配给 。从 的角度来看,它可能仍然有积压的消息需要处理,并且忘记了重新分配。你最终会陷入这样一种情况,即在很短的时间内,两者都可能认为它们都是“拥有”的,并且会采取相应的行动,在传出的主题中创建重复的消息,更糟糕的是,这些重复项可能会出现无序。C0P0P0C0C1C0C0C1P0

启用原始博客所指的“围栏”。作为重新分配的一部分,新生产者将按照递增的纪元编号行事,而现有生产者仍将使用旧纪元。击剑是微不足道的;在纪元失效的地方放置消息。transactional.id

Kafka交易有一些陷阱:

  • 入站和出站主题必须位于同一集群上,事务才能正常工作。
  • 的命名对于制作人的“移交”至关重要,即使您不关心僵尸围栏。新生产者的出现将促使为失效的生产者整理任何孤立的动态交易,因此要求ID在生产者会话之间稳定/可重复。不要为此使用随机ID;这不仅会导致未完成的事务(在模式下阻止每个使用者),而且还会在事务协调器(在代理上运行)上累积额外的状态。默认情况下,此状态将持续 7 天,因此您不希望一时兴起生成任意命名的事务生产者。transactional.idREAD_COMMITTED
  • 理想情况下,反映了入站主题分区的组合。(当然,除非你有一个单分区主题。在实践中,这意味着为分配给使用者的每个分区创建一个新的事务生产者。(请记住,在使用-遍历-发布方案中,创建者也是使用者,使用者分区分配将随每个重新平衡事件而变化。看看 spring-kafka 实现,它懒惰地为每个入站分区创建一个新的生产者。(关于这种方法的安全性,以及是否应该在分区重新分配时清理生产者,这是有话要说的,但这是另一回事。transactional.id
  • 击剑机制仅在卡夫卡一级运作。换句话说,它将失落的生产者与卡夫卡隔离开来,但与世界其他地区隔绝。这意味着,如果创建者还必须更新某些外部状态(在数据库、缓存等中)作为使用-转换-发布周期的一部分,则应用程序有责任在分区重新分配时将自己与数据库隔离开来,或者以其他方式确保更新的幂等性。

只是为了完整起见,值得指出的是,这不是实现击剑的唯一方法。Kafka 消费者 API 确实为用户提供了注册的能力,这为被取代的消费者提供了一种最后的机会,在将分区重新分配给新消费者之前,可以排出任何未完成的积压工作(或摆脱积压工作)。回调被阻塞;当它返回时,假设处理程序已将自己与本地隔离开来;然后,只有这样,新的消费者才会恢复处理。ConsumerRebalanceListener


答案 2

您提到的博客文章包含您要查找的所有信息,尽管它相当密集。

上述文章中的“为什么交易?”部分。

使用为至少一次交付语义配置的 vanilla Kafka 生产者和使用者,流处理应用程序可能会通过以下方式丢失一次处理语义:

  1. 由于内部重试,这可能会导致消息 B 的重复写入。这个问题由幂等的生产者解决,并不是本文其余部分的重点。producer.send()

  2. 我们可能会重新处理输入消息 A,导致重复的 B 消息被写入输出,这违反了恰好一次的处理语义。如果流处理应用程序在写入 B 后但在将 A 标记为已使用之前崩溃,则可能会发生重新处理。因此,当它恢复时,它将再次消耗A并再次写入B,从而导致重复。

  3. 最后,在分布式环境中,应用程序将崩溃,或者更糟的是,暂时失去与系统其余部分的连接。通常,会自动启动新实例以替换被视为丢失的实例。通过这个过程,我们可能会有多个实例处理相同的输入主题并写入相同的输出主题,从而导致重复的输出并违反恰好一次的处理语义。我们称之为“僵尸实例”问题。[着重号后加]

来自同一篇文章中的事务语义部分。

僵尸围栏

我们通过要求为每个事务生产者分配一个名为 .这用于在进程重新启动期间标识同一创建器实例。[着重号后加]transactional.id

API 要求事务生产者的第一个操作应该是在 Kafka 集群中显式注册它。当它这样做时,Kafka经纪人检查给定的未结交易并完成它们。它还会增加与 关联的纪元。纪元是为每个 存储的内部元数据片段。transactional.idtransactional.idtransactional.idtransactional.id

一旦时代被撞击,任何具有相同 transactional.id 和较旧时代的生产者都被视为僵尸并被隔离开来,即这些生产者的未来交易写入将被拒绝。[着重号后加]

以及同一篇文章中的“数据流”部分。

A:生产者和交易协调器的互动

执行事务时,创建者在以下几点向事务协调器发出请求:

  1. initTransactions API 向协调器注册一个 transactional.id。此时,协调器关闭与该 transactional.id 的任何未决交易,并颠簸纪元以隔离僵尸。每个生产者会话仅发生这种情况一次。[着重号后加]

  2. 当创建者即将在事务中首次将数据发送到分区时,将首先向协调器注册该分区。

  3. 当应用程序调用 或 时,将向协调器发送一个请求,以开始两阶段提交协议。commitTransactionabortTransaction

希望这有帮助!


推荐