正如你所说,你有几个选择。
如果将其转换为主题以获得相同的效果,则需要使使用者成为持久的使用者。队列提供的一件事是,如果您的使用者不活跃,则持久性。这取决于您使用的 MQ 系统。
如果要坚持使用队列,将为每个使用者创建一个队列,并创建一个将侦听原始队列的调度程序。
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
主题的优点
- 更易于动态添加新使用者。所有消费者都将在没有任何工作的情况下收到新消息。
- 您可以创建轮循机制主题,以便Consumer_1收到消息,然后Consumer_2,然后Consumer_3
- 可以向使用者推送新消息,而不必查询队列,使它们变得被动。
主题的缺点
- 消息不是持久的,除非您的代理支持此配置。如果使用者脱机并返回,则可能会丢失消息,除非设置了持久性使用者。
- 很难让Consumer_1和Consumer_2收到消息,但不能Consumer_3。使用调度程序和队列时,调度程序无法将消息放入Consumer_3队列中。
队列的优点
- 消息是持久的,直到使用者删除它们
- 调度程序可以通过不将消息放入相应的使用者队列来筛选哪些使用者获取哪些消息。不过,这可以通过过滤器对主题完成。
队列的缺点
- 需要创建其他队列以支持多个使用者。在动态环境中,这是没有效率的。
在开发消息传递系统时,我更喜欢主题,因为它给了我最大的权力,但是看到您已经在使用队列,它将要求您更改系统的工作方式以实现主题。
多消费者队列系统的设计与实现
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
源
请记住,您还需要注意其他事项,例如问题异常处理,重新连接到连接以及丢失连接时的队列等。这只是为了让您了解如何完成我所描述的操作。
在一个真正的系统中,我可能不会在第一个例外时退出。我会允许系统继续以最佳状态运行并记录错误。在此代码中,如果将消息放入单个使用者队列失败,则整个调度程序将停止。
调度程序.java
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package stackoverflow_4615895;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
public class Dispatcher {
private static long QUEUE_WAIT_TIME = 1000;
private boolean mStop = false;
private QueueConnectionFactory mFactory;
private String mSourceQueueName;
private String[] mConsumerQueueNames;
/**
* Create a dispatcher
* @param factory
* The QueueConnectionFactory in which new connections, session, and consumers
* will be created. This is needed to ensure the connection is associated
* with the correct thread.
* @param source
*
* @param consumerQueues
*/
public Dispatcher(
QueueConnectionFactory factory,
String sourceQueue,
String[] consumerQueues) {
mFactory = factory;
mSourceQueueName = sourceQueue;
mConsumerQueueNames = consumerQueues;
}
public void start() {
Thread thread = new Thread(new Runnable() {
public void run() {
Dispatcher.this.run();
}
});
thread.setName("Queue Dispatcher");
thread.start();
}
public void stop() {
mStop = true;
}
private void run() {
QueueConnection connection = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
QueueSession session = null;
try {
// Setup connection and queues for receiving the messages
connection = mFactory.createQueueConnection();
session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue sourceQueue = session.createQueue(mSourceQueueName);
consumer = session.createConsumer(sourceQueue);
// Create a null producer allowing us to send messages
// to any queue.
producer = session.createProducer(null);
// Create the destination queues based on the consumer names we
// were given.
Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
for (int index = 0; index < mConsumerQueueNames.length; ++index) {
destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
}
connection.start();
while (!mStop) {
// Only wait QUEUE_WAIT_TIME in order to give
// the dispatcher a chance to see if it should
// quit
Message m = consumer.receive(QUEUE_WAIT_TIME);
if (m == null) {
continue;
}
// Take the message we received and put
// it in each of the consumers destination
// queues for them to process
for (Queue q : destinationQueues) {
producer.send(q, m);
}
}
} catch (JMSException ex) {
// Do wonderful things here
} finally {
if (producer != null) {
try {
producer.close();
} catch (JMSException ex) {
}
}
if (consumer != null) {
try {
consumer.close();
} catch (JMSException ex) {
}
}
if (session != null) {
try {
session.close();
} catch (JMSException ex) {
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
}
}
}
}
}
主要.java
QueueConnectionFactory factory = ...;
Dispatcher dispatcher =
new Dispatcher(
factory,
"Queue_Original",
new String[]{
"Consumer_Queue_1",
"Consumer_Queue_2",
"Consumer_Queue_3"});
dispatcher.start();