RabbitMQ:快速生产者和慢速消费者
我有一个应用程序,它使用RabbitMQ作为消息队列,在两个组件之间发送/接收消息:发送方和接收方。发件人以非常快的方式发送消息。接收方接收消息,然后执行一些非常耗时的任务(主要是为非常大的数据大小写入数据库)。由于接收方需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填充队列。所以我的问题是:这会导致消息队列溢出吗?
消息使用者如下所示:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
使用者收到的每条消息都包含一个 caseID。对于每个 caseID,它会将大量数据保存到数据库中,这需要很长时间。目前,只有一个使用者为 RabbitMQ 设置,因为生产者/消费者使用相同的队列来发布/订阅 caseID。那么,如何加快使用者吞吐量,以便使用者能够赶上生产者并避免队列中消息溢出呢?我应该在消费部分使用多线程来加快消费速度吗?或者我应该使用多个使用者来同时使用传入的消息?或者,是否有任何异步方法可以让使用者异步使用消息而无需等待它完成?欢迎任何建议。