为什么我的 RabbitMQ 频道不断关闭?

2022-09-01 03:34:25

我正在调试一些使用Apache POI从Microsoft Office文档中提取数据的Java代码。有时,它会遇到一个大文档,并且 POI 在内存不足时崩溃。此时,它会尝试将错误发布到 RabbitMQ,以便其他组件可以知道此步骤失败并采取相应的操作。但是,当它尝试发布到队列时,它会得到一个 .com.rabbitmq.client.AlreadyClosedException (clean connection shutdown; reason: Attempt to use closed channel)

下面是错误处理程序代码:

try {
    //Extraction and indexing code
}
catch(Throwable t) {
    // Something went wrong! We'll publish the error and then move on with
    // our lives
    System.out.println("Error received when indexing message: ");
    t.printStackTrace();
    System.out.println();
    String error = PrintExc.format(t);
    message.put("error", error);

    if(mime == null) {
        mime = "application/vnd.unknown";
    }

    message.put("mime", mime);
    publish("IndexFailure", "", MessageProperties.PERSISTENT_BASIC, message);
}

为完整起见,下面是发布方法:

private void publish(String exch, String route, 
    AMQP.BasicProperties props, Map<String, Object> message) throws Exception{
    chan.basicPublish(exch, route, props, 
        JSONValue.toJSONString(message).getBytes());  
}

我在尝试块中找不到任何似乎关闭RabbitMQ通道的代码。是否有任何情况下可以隐式关闭通道?

编辑:我应该注意,已经关闭的异常是由发布内部的调用抛出的。basicPublish


答案 1

AMQP 通道因通道错误而关闭。可能导致通道错误的两种常见情况:

  • 尝试将消息发布到不存在的交易所
  • 尝试发布具有立即标志集的消息,该消息没有具有活动使用者集的队列

我会考虑在你尝试使用的通道上设置一个 ShutdownListener,使用 addShutdownListener() 来捕获关闭事件并查看导致它的原因。


答案 2

在我的情况下,另一个原因是我错误地承认了两次消息。这会导致在第二次确认后在日志中出现类似这样的 RabbitMQ 错误。

=ERROR REPORT==== 11-Dec-2012::09:48:29 ===
connection <0.6792.0>, channel 1 - error:
{amqp_error,precondition_failed,"unknown delivery tag 1",'basic.ack'}

在我删除了重复的确认之后,错误消失了,通道不再关闭,并且已经关闭的异常也消失了。


推荐