如何挑选卡夫卡 transaction.id
我想知道我能不能得到一些帮助来理解Kafka中的交易,特别是我如何使用 transaction.id。下面是上下文:
- 我的 Kafka 应用程序遵循以下模式:使用来自输入主题的消息,处理,发布到输出主题。
- 我没有使用Kafka Streams API。
- 我在单个消费者组中有多个消费者,每个消费者都在自己的轮询线程中。
- 有一个线程池,其中包含执行消息处理并发布到输出主题的工作线程。目前,每个线程都有自己的生产者实例。
- 我正在使用已发布的事务 API 来确保消耗偏移量的更新和发布到输出主题的原子方式发生
到目前为止,我的假设包括:
- 如果我的进程在事务中间崩溃,那么该事务中的任何内容都不会发布,也不会移动任何消耗偏移量。因此,在重新启动时,我只需从原始消耗偏移量再次启动事务即可。
- 对于制片人 transaction.id 来说,重要的是它是独一无二的。因此,我可以在启动时生成一个基于时间戳的id
然后我读了下面的博客:https://www.confluent.io/blog/transactions-apache-kafka/。特别是在“如何选择事务 ID”一节中,它似乎暗示我需要保证每个输入分区都有一个生产者实例。它说:“正确屏蔽僵尸的关键是确保读 - 进程 - 写周期中的输入主题和分区对于给定的 transactional.id 始终相同。它进一步引用了问题示例如下:“例如,在分布式流处理应用程序中,假设主题分区tp0最初由 transactional.id T0处理。如果在稍后的某个时候,它可以映射到另一个具有 transactional.id T1 的生产者,则 T0 和 T1 之间将没有围栏。因此,来自tp0的消息可能会被重新处理,这违反了恰好一次处理保证。
我不太明白为什么会这样。在我看来,只要事务是原子的,我就不应该关心哪个生产者处理来自任何分区的消息。我已经为此挣扎了一天,我想知道是否有人可以告诉我我在这里错过了什么。那么,为什么我不能将工作分配给具有任何 transaction.id 设置的任何生产者实例,只要它是唯一的。为什么他们说,如果您这样做,消息可能会通过事务提供的围栏泄漏。