RabbitMQ:快速生产者和慢速消费者

我有一个应用程序,它使用RabbitMQ作为消息队列,在两个组件之间发送/接收消息:发送方和接收方。发件人以非常快的方式发送消息。接收方接收消息,然后执行一些非常耗时的任务(主要是为非常大的数据大小写入数据库)。由于接收方需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填充队列。所以我的问题是:这会导致消息队列溢出吗?

消息使用者如下所示:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}

使用者收到的每条消息都包含一个 caseID。对于每个 caseID,它会将大量数据保存到数据库中,这需要很长时间。目前,只有一个使用者为 RabbitMQ 设置,因为生产者/消费者使用相同的队列来发布/订阅 caseID。那么,如何加快使用者吞吐量,以便使用者能够赶上生产者并避免队列中消息溢出呢?我应该在消费部分使用多线程来加快消费速度吗?或者我应该使用多个使用者来同时使用传入的消息?或者,是否有任何异步方法可以让使用者异步使用消息而无需等待它完成?欢迎任何建议。


答案 1

“这是否会导致消息队列溢出?”

是的。RabbitMQ 将进入“流控制”状态,以防止随着队列长度的增加而过度消耗内存。它还将开始将消息保存到磁盘,而不是将它们保存在内存中。

“那么我该如何加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出”

您有 2 种选择:

  1. 添加更多使用者。请记住,如果选择此选项,您的数据库现在将由多个并发进程操作。确保数据库能够承受额外的压力。
  2. 增加使用通道的 QOS 值。这将从队列中提取更多消息,并将它们缓冲到使用者上。这将增加整体处理时间;如果缓冲了 5 条消息,则第 5 条消息将需要消息 1...5 的处理时间才能完成。

“我应该在消费者部分使用多线程来加快消费速度吗?”

除非您有精心设计的解决方案,否则不会。向应用程序添加并行性将在使用者端增加大量开销。您可能最终会耗尽线程池或限制内存使用。

在处理AMQP时,您确实需要考虑每个流程的业务需求,以便设计最佳解决方案。传入消息的时间敏感性如何?它们是否需要尽快保存到 DB 中,或者这些数据是否立即可用对用户很重要?

如果不需要立即持久化数据,则可以修改应用程序,以便使用者只需从队列中删除消息并将其保存到缓存的集合(例如,在 Redis 中)。引入第二个进程,然后按顺序读取和处理缓存的消息。这将确保您的队列长度不会增长到足以导致流控制,同时防止您的数据库受到写入请求的轰炸,这些请求通常比读取请求更昂贵。您的使用者现在只需从队列中删除消息,稍后由另一个进程处理。


答案 2

你有很多方法可以提高你的表现。

  1. 您可以创建具有更多生产者的工作线程队列,通过这种方式创建一个简单的负载平衡系统。不要使用交换--->队列,而只使用队列。阅读这篇文章 RabbitMQ 非轮循机制调度

  2. 当您收到一条消息时,您可以创建一个池线程以在数据库中插入数据,但在这种情况下,您必须管理故障。

但我认为主要问题是数据库而不是RabbitMQ。通过良好的调优、多线程和工作队列,您可以拥有可扩展且快速的解决方案。

让我知道