JMS - 从一个消费者到多个消费者

2022-09-01 18:25:28

我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。

我想要的是不止一个消费者收到这些信息。我想到的第一件事是将队列转换为主题,以便当前和新的消费者可以订阅并将相同的消息传递给他们所有人。

这显然将涉及在生产者和消费者方面修改当前客户端代码。

我还想看看其他选项,例如创建第二个队列,这样我就不必修改现有的使用者。我相信这种方法有一些优点,比如(如果我错了,请纠正我)在两个不同的队列而不是一个队列之间平衡负载,这可能会对性能产生积极的影响。

我想获得有关您可能会看到的这些选项和缺点/优点的建议。任何反馈都非常感谢。


答案 1

正如你所说,你有几个选择。

如果将其转换为主题以获得相同的效果,则需要使使用者成为持久的使用者。队列提供的一件事是,如果您的使用者不活跃,则持久性。这取决于您使用的 MQ 系统。

如果要坚持使用队列,将为每个使用者创建一个队列,并创建一个将侦听原始队列的调度程序。

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

主题的优点

  • 更易于动态添加新使用者。所有消费者都将在没有任何工作的情况下收到新消息。
  • 您可以创建轮循机制主题,以便Consumer_1收到消息,然后Consumer_2,然后Consumer_3
  • 可以向使用者推送新消息,而不必查询队列,使它们变得被动。

主题的缺点

  • 消息不是持久的,除非您的代理支持此配置。如果使用者脱机并返回,则可能会丢失消息,除非设置了持久性使用者。
  • 很难让Consumer_1和Consumer_2收到消息,但不能Consumer_3。使用调度程序和队列时,调度程序无法将消息放入Consumer_3队列中。

队列的优点

  • 消息是持久的,直到使用者删除它们
  • 调度程序可以通过不将消息放入相应的使用者队列来筛选哪些使用者获取哪些消息。不过,这可以通过过滤器对主题完成。

队列的缺点

  • 需要创建其他队列以支持多个使用者。在动态环境中,这是没有效率的。

在开发消息传递系统时,我更喜欢主题,因为它给了我最大的权力,但是看到您已经在使用队列,它将要求您更改系统的工作方式以实现主题。

多消费者队列系统的设计与实现

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

请记住,您还需要注意其他事项,例如问题异常处理,重新连接到连接以及丢失连接时的队列等。这只是为了让您了解如何完成我所描述的操作。

在一个真正的系统中,我可能不会在第一个例外时退出。我会允许系统继续以最佳状态运行并记录错误。在此代码中,如果将消息放入单个使用者队列失败,则整个调度程序将停止。

调度程序.java

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package stackoverflow_4615895;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

public class Dispatcher {

    private static long QUEUE_WAIT_TIME = 1000;
    private boolean mStop = false;
    private QueueConnectionFactory mFactory;
    private String mSourceQueueName;
    private String[] mConsumerQueueNames;

    /**
     * Create a dispatcher
     * @param factory
     *      The QueueConnectionFactory in which new connections, session, and consumers
     *      will be created. This is needed to ensure the connection is associated
     *      with the correct thread.
     * @param source
     *
     * @param consumerQueues
     */
    public Dispatcher(
        QueueConnectionFactory factory, 
        String sourceQueue, 
        String[] consumerQueues) {

        mFactory = factory;
        mSourceQueueName = sourceQueue;
        mConsumerQueueNames = consumerQueues;
    }

    public void start() {
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Dispatcher.this.run();
            }
        });
        thread.setName("Queue Dispatcher");
        thread.start();
    }

    public void stop() {
        mStop = true;
    }

    private void run() {

        QueueConnection connection = null;
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        QueueSession session = null;
        try {
            // Setup connection and queues for receiving the messages
            connection = mFactory.createQueueConnection();
            session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            Queue sourceQueue = session.createQueue(mSourceQueueName);
            consumer = session.createConsumer(sourceQueue);

            // Create a null producer allowing us to send messages
            // to any queue.
            producer = session.createProducer(null);

            // Create the destination queues based on the consumer names we
            // were given.
            Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
            for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
            }

            connection.start();

            while (!mStop) {

                // Only wait QUEUE_WAIT_TIME in order to give
                // the dispatcher a chance to see if it should
                // quit
                Message m = consumer.receive(QUEUE_WAIT_TIME);
                if (m == null) {
                    continue;
                }

                // Take the message we received and put
                // it in each of the consumers destination
                // queues for them to process
                for (Queue q : destinationQueues) {
                    producer.send(q, m);
                }
            }

        } catch (JMSException ex) {
            // Do wonderful things here 
        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException ex) {
                }
            }
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException ex) {
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                }
            }
        }
    }
}

主要.java

    QueueConnectionFactory factory = ...;

    Dispatcher dispatcher =
            new Dispatcher(
            factory,
            "Queue_Original",
            new String[]{
                "Consumer_Queue_1",
                "Consumer_Queue_2",
                "Consumer_Queue_3"});
    dispatcher.start();

答案 2

您可能不必修改代码;这取决于你如何写它。

例如,如果您的代码使用 而不是 发送消息,则它将适用于主题和队列。同样,如果您使用 而不是 .MessageProducerQueueSenderMessageConsumerQueueReceiver

从本质上讲,在 JMS 应用程序中,使用非特定接口与 JMS 系统交互是很好的做法,例如 、 、 等。如果是这样的话,那就是“仅仅”的配置问题。MessageProducerMessageConsumerDestination


推荐