RabbitMQ 和通道 Java 线程安全

2022-09-03 15:14:48

在本指南 https://www.rabbitmq.com/api-guide.html RabbitMQ guys 指出:

通道和并发注意事项(线程安全)

通道实例不得在线程之间共享。应用程序应优先使用每个线程的通道,而不是在多个线程之间共享相同的通道。虽然通道上的某些操作可以安全地并发调用,但有些操作则不然,并且会导致网络上的帧交错不正确。线程之间的共享通道也会干扰 * 发布者确认。

线程安全非常重要,所以我试图尽可能地勤奋,但问题是:

我有这个应用程序,从兔子接收消息。收到消息后,它会处理该消息,然后在完成后进行 ack。应用程序可以在具有 2 个线程的固定线程池中同时处理 2 个项目。Rabbit 的 QOS 预取设置为 2,因为我不想为应用提供超出其在一段时间内处理能力的量。

现在,我的使用者的句柄交付执行以下操作:

Task run = new Task(JSON.parse(message));    
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));

此时,您已经知道 TestWrapperThread 将调用作为最后一个操作执行。channel.basicAck(deliveryTag, false);

根据我对文档的理解,这是不正确的,并且可能有害,因为通道不是线程安全的,这种行为可能会搞砸事情。但是我该怎么做呢?我的意思是,我有一些想法,但它们会使一切变得更加复杂,我想弄清楚它是否真的有必要。

提前致谢


答案 1

我想你只用于你的消费者,而不是其他操作,如发布等。Channel

在你的情况下,唯一的潜在问题在这里:

channel.basicAck(deliveryTag, false);

因为你跨两个线程调用它,顺便说一句,如果你看到java代码,这个操作是安全的:

该类调用:ChannelN.java

public void basicAck(long deliveryTag, boolean multiple)
   throws IOException
{
   transmit(new Basic.Ack(deliveryTag, multiple));
}

请参阅 ChannelN 的 github 代码.java

AMQChannel 中的方法使用:transmit

public void transmit(Method m) throws IOException {
   synchronized (_channelMutex) {
       transmit(new AMQCommand(m));
   }
}

_channelMutex是一个protected final Object _channelMutex = new Object();

请参阅 AMQChannel 的 github 代码.java

编辑

正如您在官方文档中所读到的,“某些”操作是线程安全的,现在还不清楚是哪些操作。我研究了代码,我认为在更多线程上调用ACK没有问题。

希望它有帮助。

编辑2我还要补充尼古拉斯的评论:

请注意,从多个线程使用(basicConsume)和堆叠是Java客户端已经使用的常见 rabbitmq 模式。

因此,您可以安全地使用它。


答案 2