是否可以使用 Kafka Streams 访问消息标头?

2022-09-01 19:22:02

通过在 Kafka 0.11 中的记录(ProducerRecordConsumerRecord)中添加 Headers,在使用 Kafka Streams 处理主题时,是否可以获取这些标头?当调用像 on a 这样的方法时,它会提供 记录的 和 的参数,但我无法看到访问 .如果我们能越过s就好了。mapKStreamkeyvalueheadersmapConsumerRecord

前任。

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 

像这样的东西会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...

答案 1

自版本 2.0.0 起,可以访问记录标头(有关详细信息,请参阅 KIP-244)。

您可以通过处理器 API(即,通过 、 或 ) 通过给定的“上下文”对象访问记录元数据(参见 https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context)。transform()transformValues()process()

更新

从 2.7.0 版本开始,处理器 API 得到了改进(参见 KIP-478),添加了一个新的类型安全类,而不是方法。对于这种情况,标头(和记录元数据)可通过类访问。api.Processorprocess(Record)process(K, V)Record

这个新功能在DSL的PAPI方法中尚不可用(例如,和兄弟姐妹)。KStream#process()KStream#transform()

+++++

在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳---但不包括流在这些旧版本中读取时实际上丢弃的标头。

但是,元数据在 DSL 级别不可用。但是,通过KIP-159扩展DSL的工作也在进行中。


答案 2

推荐