Kafka给出了:“小组成员在实际进入消费小组之前需要拥有有效的成员ID”

2022-09-02 10:42:53

我正在使用Kafka来使用Java中的消息。我想通过在本地框中多次启动同一应用来进行测试。当我启动时,我第一次能够开始使用来自该主题的消息。当我启动第二个时,我得到:

Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group

并且不要从该主题获取任何消息。如果我试图开始更多的人,我会遇到同样的问题。

我用于Kafka的配置是

spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.use.type.headers: false
    listener:
      missing-topics-fatal: false

我有两个主题

@Configuration
public class KafkaTopics {
    @Bean("alertsTopic")
    public NewTopic alertsTopic() {

        return TopicBuilder.name("XXX.alerts")
            .compact()
            .build();
    }

    @Bean("serversTopic")
    public NewTopic serversTopic() {

        return TopicBuilder.name("XXX.servers")
            .compact()
            .build();
    }

}

以及不同类文件中的两个侦听器。

@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
    properties = {
        "spring.json.key.default.type=java.lang.String",
        "spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.ServerInfo"
    })
public void registerServer(
    @Payload(required = false) ServerInfo serverInfo
) 

@KafkaListener(topics = ALERTS_KAFKA_TOPIC,
    id = "#{T(java.util.UUID).randomUUID().toString()}",
    properties = {
        "spring.json.key.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafkaKey",
        "spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafka"
    })
public void processAlert(
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) AlertOnKafkaKey key,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
    @Header(KafkaHeaders.OFFSET) long offset,
    @Payload(required = false) AlertOnKafka alert)

答案 1

从我的分析。这是正常行为,您可以更改日志级别以将其排除。

这样做的原因是,如果服务器检测到客户端可以支持,则会将该错误返回给客户端。KIP-394中指出了这一点。member.id

然后,客户端将使用生成的成员 ID 重新连接到服务器。


答案 2

我遇到了同样的问题,也阻止了我的消费者订阅某些主题。

我认为 member.id 可能类似于 client-id(在此处使用 Camel),这可能以某种方式与消费者组相关。

对我来说修复它的是一个更改,这次是我的消费服务的客户端ID不变,保持相同的消费者组不变。


推荐