处理 Kafka 流中的异常

2022-09-01 19:14:39

已经经历了多个帖子,但其中大多数是相关的处理坏消息,而不是处理它们时的异常处理。

我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?例外情况可能是由于多种原因造成的,例如网络故障,运行时异常等,

  • 有人能建议什么是正确的方法吗?我应该使用吗?还是有更好的方法?setUncaughtExceptionHandler
  • 如何处理重试?

答案 1

这取决于你想如何处理生产者端的例外。如果会在生产者上抛出异常(例如,由于网络故障或kafka代理已经死亡),则默认情况将死亡。对于 kafka-streams 版本 1.1.0,您可以通过实现如下方式来覆盖默认行为:ProductionExceptionHandler

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

    @Override
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]",  new String(record.value()), record.topic(), exception);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
    }

}

从 handle 方法,如果您不希望流在异常时死亡,则可以返回,在返回时返回,以防您想要流停止(FAIL 是默认的)。并且您需要在流配置中指定此类:CONTINUEFAIL

default.production.exception.handler=com.example.CustomProductionExceptionHandler

还要注意,它只处理生产者上的异常,并且在使用流方法等处理消息期间不会处理异常,您需要用try / catch块包装这些方法逻辑(将所有方法逻辑放入test块中,以确保您将处理所有异常情况):ProductionExceptionHandlermapValues(..)filter(..)branch(..)

.filter((key, value) -> { try {..} catch (Exception e) {..} })

据我所知,我们不需要显式处理消费者端的异常,因为kafka流稍后会自动重试(因为在使用和处理消息之前,偏移量不会改变);例如,如果kafka代理在一段时间内无法访问,您将从kafka流中获得异常,并且当中断时,kafka流将消耗所有消息。因此,在这种情况下,我们将只是延迟,没有任何损坏/丢失。

您将无法更改默认行为,例如,有了它,您只能将错误记录或消息发送到失败主题中。setUncaughtExceptionHandlerProductionExceptionHandler


kafka-streams 2.8.0 起的更新

因为,您可以使用 方法自动替换失败的流线程(由未捕获的异常引起的)。有关更多详细信息,请查看 Kafka 流特定未捕获的异常处理程序kafka-streams2.8.0KafkaStreamsvoid setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

答案 2

为了处理消费者端的异常,

1) 您可以使用以下属性在生产者中添加默认异常处理程序。

"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";

基本上,apache提供了三个异常处理程序类作为

1)LogAndContiuneExceptionHandler,您可以将其作为

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndContinueExceptionHandler.class);

2) LogAndFailExceptionHandler

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndFailExceptionHandler.class);

3) LogAndSkipOnInvalidTimestamp

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndSkipOnInvalidTimestamp.class);

对于自定义异常处理,

1)您可以实现反序列化ExceptionHandler接口并覆盖handle()方法。

2)或者您可以扩展上述类。


推荐