Kafka 流用于添加全局存储的用例

2022-09-02 22:05:17

在 kafka 流中定义拓扑时,可以添加全局状态存储。它将需要一个源主题以及一个 .处理器接收记录,并可以在将它们添加到存储之前从理论上转换它们。但在还原的情况下,记录直接从源主题(changelog)插入到全局状态存储中,跳过在处理器中完成的最终转换。ProcessorSupplier

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration

StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder, String topic, Used useded, ProcessorSupplier stateUpdateSupplier)将全局状态存储添加到拓扑中。

根据文档

注意:不应使用处理器将转换后的记录插入到全局状态存储中。此存储使用源主题作为更改日志,在还原期间将直接从源插入记录。应使用此 ProcessorNode 使 StateStore 保持最新。

同时,由于主要错误目前在kafka错误跟踪器上打开:addGlobalStore上提供的KAFKA-7663自定义处理器在从主题恢复状态时不使用,这准确地解释了文档中陈述的内容,但似乎是一个被接受的错误。

我想知道KAFKA-7663是否确实是一个错误。根据文档,它似乎是这样设计的,在这种情况下,我很难理解用例。
有人可以解释这个低级API的主要用例吗?我唯一能想到的是处理副作用,例如,在处理器中执行一些日志操作。

奖励问题:如果源主题充当全局存储的更改日志,当由于保留期已过期而从主题中删除记录时,是否会从全局状态存储中删除该记录?或者,删除是否仅在从更改日志中完全恢复存储后才会在商店中进行。


答案 1

是的,这是一个非常奇怪的小第22条军规,但文档是正确的。全局状态存储的处理器不得对记录执行任何操作,而是将其保存到存储中。

AFAIK,这不是一个哲学问题,只是一个实际问题。原因很简单,就是你观察到的行为...Streams 将输入主题视为存储的更改日志主题,因此在还原期间绕过处理器(以及反序列化)。

状态还原绕过任何处理的原因是,通常更改日志中的数据与存储中的数据相同,因此对它执行任何新操作实际上都是错误的。此外,只需将字节从网络上取下并将它们批量写入状态存储即可提高效率。我之所以说“通常”,是因为在这种情况下,输入主题与正常的更改日志主题并不完全一样,因为它在存储放置期间不会收到其写入。

就其价值而言,我也很难理解用例。看起来,我们应该:

  1. 完全摆脱该处理器,并始终将二进制数据从网络上转储到存储中,就像恢复一样。
  2. 重新设计全局存储,以允许在全局存储之前进行任意转换。我们可以:
    • 继续使用输入主题,并在还原期间反序列化和调用处理器,或者
    • 为全局存储添加一个真正的更改日志,这样我们就可以轮询输入主题,应用一些转换,然后写入全局存储全局存储更改日志。然后,我们可以使用更改日志(而不是输入)进行还原和复制。

顺便说一句,如果你想要后一种行为,你可以通过应用你的转换,然后用于制造一个“changelog”主题来近似它。然后,您将创建全局存储以从您的而不是输入中读取。to(my-global-changelog)my-global-changelog

因此,为了给您一个直接的答案,KAFKA-7663不是一个错误。我将对提议将其转换为功能请求的票证进行评论。

额外答案:充当状态存储的更改日志的主题不得配置保留期。实际上,这意味着您应该通过启用压缩来防止无限增长,并禁用日志保留。

在实践中,旧数据从保留期中掉落并被丢弃并不是一个“事件”,消费者无法知道它是否/何时发生。因此,无法从状态存储中删除数据以响应此非事件。正如你所描述的那样,它会发生...记录将无限期地存放在全球商店中。如果/当实例被替换时,新实例将从输入中恢复,并且(显然)仅接收当时主题中存在的记录。因此,Streams 集群作为一个整体最终会得到一个不一致的全局状态视图。这就是您应该禁用保留的原因。

从存储中“删除”旧数据的正确方法是在输入主题中为所需键编写一个逻辑删除。然后,这将正确传播到集群的所有成员,在恢复期间正确应用,并由代理正确压缩。

我希望这一切都有帮助。当然,请在工单上插话,帮助我们将API塑造得更直观!


答案 2

目前,似乎没有办法监听KGlobalTable上的更改。

您可以使用全局存储和自定义处理器获得类似的结果。

我在这里偶然发现了这一点 如何获得有关GlobalKTable状态存储更新的通知?

我并不是说这是一个很好的用例,但作为一种解决方法,它可能会有所帮助。


推荐