如何在 rabbitmq 中池化频道?

2022-09-04 20:43:28

我一直在尝试在线程之间共享连接,并且仅在线程创建时打开通道,但是在进行了更多的研究之后,我想我也想尝试。我怎样才能在 rabbitmq 上执行此操作?或者这是一个我可以普遍应用的一般想法?我的目标是生成X线程,然后让它们不必打开新通道(这需要在客户端和服务器之间建立轮循机制)。connection pooling

由于线程是它们自己的类,我不确定我是否需要将池放在生成线程的类本身中,或者它们去哪里?我还有多种类型的线程,我想在它们之间共享这些连接(而不仅仅是一个)。这可能吗?

为了给您一个大致的概念,以下是在rabbymq中如何评估连接/通道:

ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();  //I want to share several of these between threads

答案 1

您所需要的只是线程可以从中提取的对象池。Channel

Apache共享资源实际上已经有了一个可以使用的通用代码。ObjectPool

该接口的javadoc可以在这里找到:http://commons.apache.org/pool/api-1.6/org/apache/commons/pool/ObjectPool.html

可以在此处找到其预构建实现之一的javadoc:http://commons.apache.org/pool/api-1.6/org/apache/commons/pool/impl/GenericObjectPool.html

可以在此处找到使用它的教程:http://commons.apache.org/pool/examples.html

如果这对于您的简单需求来说过于复杂,那么您真正需要做的就是编写一个管理一组对象的类,允许线程签出它们并将它们返回到池中,并进行适当的同步以防止两个线程获得相同的对象。ChannelChannel


答案 2

您也可以使用 ThreadLocal 对象,以防您使用通道。

RabbitMQ建议您为每个线程使用通道,因此这将是一个完美的匹配。

示例代码:

private final ThreadLocal<Channel> channels = new ThreadLocal<>();
...
Channel channel = channels.get();
 if (channel == null){
        channel = connection.createChannel();
        channels.set(channel);
    }

无需关闭通道,因为当连接关闭时,应用程序将关闭这些通道。

但是,如果您要大量创建新线程,则此解决方案可能不太适合您,因为这将分配许多永远不会关闭的新通道。但是,如果你做这样的事情,你可能做错了什么。