春季卡夫卡 - 如何使用组ID将偏移量重置为最新?

我目前正在使用Spring Integration Kafka进行实时统计。但是,组名使Kafka搜索侦听器未读取的所有先前值。

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}

我想开始最新的偏移量,而不是被旧值所困扰。是否可以重置组的偏移量?


答案 1

因为我没有看到任何这样的例子,所以我要解释一下我在这里是如何做到的。

必须实现一个类的类,该类将允许侦听器在确定分区属性时控制偏移查找。(来源 : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek@KafkaListenerConsumerSeekAware )

public class KafkaMessageListener implements ConsumerSeekAware {
    @KafkaListener(topics = "your.topic")
    public void listen(byte[] payload) {
        // ...
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {


    }
}

在这里,在重新平衡时,我们使用给定的回调来查找所有给定主题的最后一个偏移量。感谢Artem Bilan(https://stackoverflow.com/users/2756547/artem-bilan)指导我找到答案。


答案 2

另一种方式,我们总是可以使用最新的消息,而无需提交组偏移量,方法是为KafkaListener注释指定属性值。{"enable.auto.commit:false", "auto.offset.reset:latest"}

@KafkaListener(id = "example-group",
        properties = {"enable.auto.commit:false", "auto.offset.reset:latest"},
        topics = "example")

推荐