如何要求 RabbitMQ 在 Spring 异步 MessageListener 用例中发生业务异常时重试

2022-09-02 12:14:00

我有一个春季AMQP消息监听器正在运行。

public class ConsumerService implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        try {
            testService.process(message); //This process method can throw Business Exception
        } catch (BusinessException e) {
           //Here we can just log the exception. How the retry attempt is made?
        } catch (Exception e) {
           //Here we can just log the exception.  How the retry attempt is made?
        }
    }
}

如您所见,在此过程中可能会出现异常。我想重试,因为 Catch 块中的特定错误。我不能通过异常在消息。如何告诉 RabbitMQ 有异常并重试?


答案 1

由于不允许抛出选中的异常,因此您可以将异常包装在 a 中并重新引发它。onMessage()RuntimeException

try {
    testService.process(message);
} catch (BusinessException e) {
    throw new RuntimeException(e);
}

但请注意,这可能会导致邮件无限期地重新传递。这是如何工作:

RabbitMQ 支持拒绝消息并要求代理重新排队。此处显示。但是 RabbitMQ 本身没有重试策略的机制,例如设置最大重试次数、延迟等。

使用 Spring AMQP 时,默认选项是“拒绝时重新排队”。Spring的SimpleMessageListenerContainer在出现未处理的异常时会默认执行此操作。因此,在您的情况下,您只需要重新抛出捕获的异常。但请注意,如果您无法处理消息并且始终引发异常,这将无限期地重新传递,并将导致无限循环。

您可以通过引发 AmqpRejectAndDontRequeueException 异常来覆盖每条消息的此行为,在这种情况下,消息将不会重新排队。

您还可以完全关闭“拒绝时重新排队”行为,方法是将SimpleMessageListenerContainer

container.setDefaultRequeueRejected(false) 

当消息被拒绝且未重新排队时,如果在 RabbitMQ 中设置了一个 DLQ,则该消息将丢失或传输到 DLQ。

如果您需要具有最大尝试次数,延迟等的重试策略,最简单的方法是设置一个弹簧“无状态”,它将在线程内执行所有重试(使用),而不会在每次重试时拒绝消息(因此每次重试都无需返回RabbitMQ)。重试次数用尽后,默认情况下将记录警告,并使用该消息。如果要发送到 DLQ,则需要一个 RepublishMessageRecoverer 或一个自定义 MessageRecoverer,它拒绝消息而不重新排队(在后一种情况下,您还应该在队列上设置 RabbitMQ DLQ)。默认邮件恢复程序的示例:RetryOperationsInterceptorThread.sleep()

container.setAdviceChain(new Advice[] {
        org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
                .stateless()
                .maxAttempts(5)
                .backOffOptions(1000, 2, 5000)
                .build()
});

这显然有一个缺点,即您将在整个重试期间占用线程。您还可以选择使用“stateful”,它将在每次重试时将消息发送回RabbitMQ,但是延迟仍将在应用程序内实现,并且设置有状态拦截器有点复杂。RetryOperationsInterceptorThread.sleep()

因此,如果您希望在不占用的情况下延迟重试,则需要在 RabbitMQ 队列上使用 TTL 的更复杂的自定义解决方案。如果您不希望指数退避(因此每次重试时延迟不会增加),那就简单一点了。为了实现这样的解决方案,你基本上在rabbitMQ上创建了另一个带有参数的队列:和。然后在主队列上设置 。所以现在当你拒绝并且不重新排队时,RabbitMQ会将其重定向到第二个队列。当 TTL 过期时,它将被重定向到原始队列,从而重新交付给应用程序。因此,现在您需要一个重试侦听器,该侦听器在每次失败后拒绝向 RabbitMQ 发送的消息,并跟踪重试计数。为了避免需要在应用程序中保留状态(因为如果您的应用程序是集群的,则需要复制状态),您可以从 RabbitMQ 设置的标头计算重试计数。在此处查看有关此标头的详细信息。因此,在这一点上,实现自定义拦截器比使用此行为自定义Spring有状态拦截器更容易。Thread"x-message-ttl": <delay time in milliseconds>"x-dead-letter-exchange":"<name of the original queue>""x-dead-letter-exchange":"<name of the queue with the TTL>"x-death

另请查看春季 AMQP 参考中有关重试的部分


答案 2

推荐