RabbitMQ 等待多个队列完成

2022-08-30 23:36:08

好的,这是正在发生的事情的概述:

    M <-- Message with unique id of 1234
    |
    +-Start Queue
    |
    |
    | <-- Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues
Q1  Q2  Q3
\   |   / <-- start of the problem is here
 \  |  / 
  \ | /
   \|/
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

所以我有一个推送到多个队列的交换,每个队列都有一个任务,一旦所有任务都完成,只有这样队列4才能启动。

因此,具有唯一 ID 为 1234 的消息被发送到交换,交换将其路由到所有任务队列(Q1、Q2、Q3 等),当消息 ID 1234 的所有任务都已完成时,运行 Q4 以获取消息 ID 1234。

如何实现此目的?

使用Symfony2,RabbitMQBundle和RabbitMQ 3.x

资源:

更新 #1

好的,我想这就是我正在寻找的:

具有并行处理的 RPC,但是如何将相关 ID 设置为我的唯一 ID 以对消息进行分组并标识哪个队列?


答案 1

你需要实现这个:http://www.eaipatterns.com/Aggregator.html 但是Symfony的RabbitMQBundle不支持它,所以你必须使用底层的php-amqplib。

来自捆绑包的正常使用者回调将获得 AMQPMessage。从那里,您可以访问通道并手动发布到“管道和过滤器”实现中接下来的任何交换


答案 2

在 RabbitMQ 站点的 RPC 教程中,有一种方法可以传递“相关 ID”,该 ID 可以向队列中的用户标识您的消息。

我建议使用某种ID将消息放入前3个队列中,然后使用另一个进程将消息从3队列排入某种存储桶。当这些存储桶收到我假设的3个任务的完成时,将最终消息发送到第4个队列进行处理。

如果要为一个用户向每个队列发送多个工作项,则可能需要执行一些预处理,以找出特定用户放入队列中的项数,以便 4 之前排队的进程知道在排队之前需要多少个。


我在C#中做了我的rabbymq,所以很抱歉我的伪代码不是php样式

// Client
byte[] body = new byte[size];
body[0] = uniqueUserId;
body[1] = howManyWorkItems;
body[2] = command;

// Setup your body here

Queue(body)

// Server
// Process queue 1, 2, 3
Dequeue(message)

switch(message.body[2])
{
    // process however you see fit
}

processedMessages[message.body[0]]++;

if(processedMessages[message.body[0]] == message.body[1])
{
    // Send to queue 4
    Queue(newMessage)
}

对更新 #1 的响应

不要将客户端视为终端,而是将客户端视为服务器上的进程可能会很有用。因此,如果您在这样的服务器上设置 RPC 客户端,那么您需要做的就是让服务器处理用户的唯一 ID 的生成,并将消息发送到相应的队列:

    public function call($uniqueUserId, $workItem) {
        $this->response = null;
        $this->corr_id = uniqid();

        $msg = new AMQPMessage(
            serialize(array($uniqueUserId, $workItem)),
            array('correlation_id' => $this->corr_id,
            'reply_to' => $this->callback_queue)
        );

        $this->channel->basic_publish($msg, '', 'rpc_queue');
        while(!$this->response) {
            $this->channel->wait();
        }

        // We assume that in the response we will get our id back
        return deserialize($this->response);
    }


$rpc = new Rpc();

// Get unique user information and work items here

// Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need.
$response = rpc->call($uniqueUserId, $workItem);

$responseBuckets[array[0]]++;

// Just like above code that sees if a bucket is full or not

推荐