kafka消费者能否在从一个主题轮询所有消息之前过滤消息?

据说消费者只能阅读整个主题。没有运气对经纪人进行评估以过滤消息。

这意味着我们必须使用/接收来自主题的所有消息,并在客户端过滤它们。

这太过分了。我想知道我们是否可以根据已经传递给代理的东西来过滤和接收特定类型的消息,例如msg密钥或其他东西。

从方法,Consumer.poll(超时),似乎没有额外的事情可以做。


答案 1

不,使用消费者,您不能只接收来自主题的一些消息。使用者按顺序获取所有消息。

如果不想筛选使用者中的消息,可以使用流作业。例如,Streams 将从您的主题中读取,并且仅将使用者感兴趣的消息推送到另一个主题。然后,使用者可以订阅此新主题。


答案 2

每个 Kafka 主题都应包含逻辑上相似的消息,只是为了保持主题。现在,有时您可能会遇到一个主题,例如水果,其中包含水果的不同属性(可能是json格式)。您可能有生产者推送的不同水果消息,但希望您的一个消费者组仅处理苹果。理想情况下,您可能已经使用带有单个水果名称的主题名称,但是由于某种原因(可能主题太多),让我们假设这是一项徒劳的努力。在这种情况下,您可以重写 Kafka 中的默认分区方案以忽略该键并执行随机分区,然后通过生产者中的分区程序.class属性传递自定义分区程序类,该属性将水果名称放在 msg 键中。这是必需的,因为默认情况下,如果在发送消息时放置密钥,它将始终转到同一分区,这可能会导致分区不平衡。

这背后的想法是,有时如果您的Kafka msg值是一个复杂的对象(json,avro-record等),那么根据键过滤记录可能比解析整个值并提取所需字段更快。我现在没有任何数据来支持这种方法的性能优势。这只是一种直觉。