Dead Letter queue (DLQ) for Kafka with spring-kafka
在 Spring Boot 2.0 应用程序中实现死信队列 (DLQ) 概念的最佳方法是什么,使用 spring-kafka 2.1.x 将所有失败的消息发送到 @KafkaListener某个预定义的 Kafka DLQ 主题,而不会丢失单个消息?
所以消耗的卡夫卡记录是:
- 已成功处理,
- 处理失败,并发送到 DLQ 主题,
- 处理失败,未发送到DLQ主题(由于意外问题),因此将被监听器再次消耗。
我尝试使用错误处理程序的自定义实现创建侦听器容器,该容器使用 KafkaTemplate 将记录处理失败到 DLQ 主题。使用禁用的自动提交和记录 AckMode。
spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ErrorHandler {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Value("${dlqTopic}")
private String dlqTopic;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
log.error("Error, sending to DLQ...");
kafkaTemplate.send(dlqTopic, record.key(), record.value());
}
}
似乎此实现并不能保证项目 #3。如果在DlqErrorHandler记录中引发异常,则监听器不会再次使用。
使用事务侦听器容器是否有帮助?
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
有没有方便的方法来使用Spring Kafka实现DLQ概念?
更新 2018/03/28
多亏了Gary Russell的回答,我能够通过实现DlqErrorHandler来实现所需的行为,如下所示
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
...
@Override
public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
Consumerrecord<?, ? record = records.get(0);
try {
kafkaTemplate.send("dlqTopic", record.key, record.value());
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
// Other records may be from other partitions, so seek to current offset for other partitions too
// ...
} catch (Exception e) {
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
// Other records may be from other partitions, so seek to current offset for other partitions too
// ...
throw new KafkaException("Seek to current after exception", thrownException);
}
}
}
这样,如果消费者轮询返回 3 条记录(1、2、3),而第 2 条记录无法处理:
- 1 将被处理
- 2 将无法处理并发送到 DLQ
- 3 多亏了消费者寻求录音.offset() + 1,它将被传送给听众
如果发送到 DLQ 失败,使用者会向 record.offset() 寻求记录,并且该记录将被重新传递给侦听器(并且发送到 DLQ 可能会被停用)。
更新 2021/04/30
自Spring Kafka 2.7.0以来,本机支持非阻塞重试和死信主题。
请参阅示例:https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt
重试通常应为非阻塞(在单独的主题中完成)并延迟: