如果未确认消息并且应用程序失败,则将自动重新传递消息,并且信封上的属性将设置为(除非您使用它们与标志一起使用)。redelivered
true
no-ack = true
UPD:
您必须在捕获块中使用重新交付标志nack
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
}
当心无限添加的消息,而重新交付计数在RabbitMQ和AMQP协议中根本没有实现。
如果您不想弄乱此类消息,而只想添加一些延迟,则可能需要添加一些或在方法调用之前添加一些延迟,但这根本不是一个好主意。sleep()
usleep()
nack
有多种技术可以处理周期重新交付问题:
1. 依靠死信交换
2. 按消息或按队列 TTL 使用
- 优点:易于实施,也是标准的,清晰的
- 缺点:排长队,您可能会丢失一些消息
示例(请注意,对于队列 ttl,我们只传递数字,对于消息 ttl - 任何将是数字字符串的内容):
2.1 每条消息 ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'expiration' => '1000'
)
);
2.2. 每个队列 ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish('message at ' . microtime(true));
3. 在消息正文或标头中保留重新传递计数或左重新传递数(在 IP 堆栈中也称为跃点限制或 ttl)
- 优点:让您在应用程序级别对消息生存期进行额外控制
- 缺点:当您必须修改消息并再次发布消息时,开销很大,特定于应用程序,不清楚
法典:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'headers' => array(
'ttl' => 100
)
)
);
$queue->consume(
function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
$headers = $msg->getHeaders();
echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
echo $msg->getDeliveryTag(), ' ';
echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
echo $msg->getBody(), PHP_EOL;
try {
//Do some business logic
throw new Exception('business logic failed');
} catch (Exception $ex) {
//Log exception
if (isset($headers['ttl'])) {
// with ttl logic
if ($headers['ttl'] > 0) {
$headers['ttl']--;
$exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
}
return $queue->ack($msg->getDeliveryTag());
} else {
// without ttl logic
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
}
}
return $queue->ack($msg->getDeliveryTag());
}
);
可能还有其他一些方法可以更好地控制消息重新传递流。
结论:没有银弹解决方案。您必须决定哪种解决方案最适合您的需求或找出其他解决方案,但不要忘记在这里分享;)