Kafka 使用者异常和偏移量提交
2022-09-03 02:07:14
我一直在尝试为Spring Kafka做一些POC工作。具体来说,我想尝试在Kafka中使用消息时处理错误的最佳实践。
我想知道是否有人能够帮助:
- 分享有关 Kafka 消费者在发生故障时应执行的操作的最佳实践
- 帮助我了解 AckMode Record 的工作原理,以及如何防止在侦听器方法中引发异常时提交到 Kafka 偏移队列。
下面给出了 2 的代码示例:
鉴于AckMode设置为RECORD,根据文档:
当侦听器在处理记录后返回时提交偏移量。
我本来以为如果侦听器方法引发异常,偏移量不会增加。但是,当我使用下面的代码/配置/命令组合测试它时,情况并非如此。偏移量仍会更新,并且继续处理下一条消息。
我的配置:
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
return factory;
}
我的代码:
@Component
public class KafkaMessageListener{
@KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
throw new RuntimeException("Oops!");
}
用于验证偏移的命令:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
我使用的是kafka_2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1.3.RELEASE