RabbitMQ以及通道与连接之间的关系

2022-08-31 06:02:48

RabbitMQ Java 客户端具有以下概念:

  • Connection- 与 RabbitMQ 服务器实例的连接
  • Channel- ???
  • 使用者线程池 - 使用 RabbitMQ 服务器队列中的消息的线程池
  • 队列 - 按 FIFO 顺序保存消息的结构

我试图理解它们之间的关系,更重要的是它们之间的联系。

  1. 我仍然不太确定a是什么,除了这是您发布和使用的结构,并且它是从开放连接创建的。如果有人能向我解释“渠道”代表什么,它可能有助于澄清一些事情。Channel
  2. 通道和队列之间有什么关系?是否可以使用相同的通道与多个队列进行通信,或者它必须是 1:1?
  3. 队列和使用者池之间有什么关系?是否可以将多个使用者订阅到同一队列?同一个使用者是否可以使用多个队列?还是关系是1:1?

答案 1
  1. A 表示与消息代理的真实 TCP 连接,而 a 是其中的虚拟连接(AMQP 连接)。这样,您就可以在应用程序中使用任意数量的(虚拟)连接,而不会使 TCP 连接使代理过载。ConnectionChannel

  2. 您可以将一个用于所有内容。但是,如果您有多个线程,则建议对每个线程使用不同的线程。ChannelChannel

    Java 客户端 API 中的通道线程安全指南

    通道实例可由多个线程安全使用。对通道的请求被序列化,一次只有一个线程能够在通道上运行命令。即便如此,应用程序应该更喜欢使用每个线程的通道,而不是在多个线程之间共享相同的通道。

    和 之间没有直接关系。A 用于将 AMQP 命令发送到代理。这可以是创建队列或类似的,但这些概念并不绑定在一起。ChannelQueueChannel

  3. 每个线程都在从使用者线程池分配的自己的线程中运行。如果多个使用者订阅了同一队列,那么代理将使用轮循机制在它们之间平均分配消息。请参阅教程 2:“工作队列”。Consumer

    也可以将相同的队列附加到多个队列。您可以将消费者理解为回调。每次消息到达使用者绑定到的队列时,都会调用这些函数。对于 Java 客户端的情况,每个使用者都有一个方法,它表示回调方法。您通常要做的是,子类并覆盖 。注意:如果将同一使用者实例附加到多个队列,则此方法将由不同的线程调用。因此,如有必要,请注意同步。ConsumerhandleDelivery(...)DefaultConsumerhandleDelivery(...)


答案 2

在这里,对AMQP协议在“引擎盖下”的作用有很好的概念性理解是有用的。我想说的是,AMQP 0.9.1选择部署的文档和API使这特别令人困惑,所以这个问题本身就是许多人必须努力解决的问题。

TL;DR

连接是与 AMQP 服务器的物理协商的 TCP 套接字。正确实现的客户端将为每个应用程序提供其中一个,线程安全,可在线程之间共享。

通道是连接上的单个应用程序会话。一个线程将具有一个或多个这些会话。AMQP 体系结构 0.9.1 是这些线程之间不共享,并且应该在创建它的线程用完它时关闭/销毁。当发生各种协议冲突时,服务器也会关闭它们。

使用者是一种虚拟构造,表示特定通道上存在“邮箱”。使用使用者会告诉代理将消息从特定队列推送到该通道终结点。

连接事实

首先,正如其他人正确指出的那样,连接是表示与服务器的实际 TCP 连接的对象。连接在 AMQP 中的协议级别指定,与代理的所有通信都通过一个或多个连接进行。

  • 由于它是实际的 TCP 连接,因此它具有 IP 地址和端口号。
  • 协议参数是在每个客户端的基础上协商的,作为设置连接的一部分(此过程称为握手
  • 它被设计为长寿命;在极少数情况下,连接闭包是协议设计的一部分。
  • 从OSI的角度来看,它可能位于第6层左右。
  • 可以设置检测信号来监视连接状态,因为 TCP 本身不包含任何用于执行此操作的内容。
  • 最好让专用线程管理对基础 TCP 套接字的读取和写入。大多数(如果不是全部)RabbitMQ 客户端都这样做。在这方面,它们通常是线程安全的。
  • 相对而言,创建连接是“昂贵”的(由于握手),但实际上,这真的无关紧要。大多数进程实际上只需要一个连接对象。但是,如果您发现需要的吞吐量超过单个线程/套接字所能提供的吞吐量(使用当前的计算技术不太可能),则可以在池中维护连接。

渠道事实

通道是为应用的每个部分打开的应用程序会话,用于与 RabbitMQ 代理进行通信。它通过单个连接运行,并表示与代理的会话

  • 由于它表示应用程序逻辑的逻辑部分,因此每个通道通常都存在于自己的线程上。
  • 通常,应用打开的所有通道将共享一个连接(它们是在连接顶部运行的轻量级会话)。连接是线程安全的,所以这是可以的。
  • 大多数 AMQP 操作都是通过通道进行的。
  • 从 OSI 层的角度来看,通道可能位于第 7 层附近。
  • 通道设计为瞬态的;AMQP设计的一部分是通道通常关闭以响应错误(例如,在删除现有队列之前使用不同的参数重新声明队列)。
  • 由于通道是暂时性的,因此应用不应池化通道。
  • 服务器使用整数来标识通道。当管理连接的线程收到特定通道的数据包时,它使用此编号告诉代理数据包属于哪个通道/会话。
  • 通道通常不是线程安全的,因为在线程之间共享它们没有意义。如果您有另一个线程需要使用代理,则需要一个新通道。

消费者概况

使用者是由 AMQP 协议定义的对象。它既不是通道也不是连接,而是您的特定应用程序用作某种“邮箱”来丢弃邮件的东西。

  • “创建使用者”意味着您告诉代理(通过连接使用通道)您希望通过该通道将消息推送给您。作为响应,代理将注册您在通道上有一个使用者,并开始向您推送消息。
  • 通过连接推送的每条消息将同时引用通道号使用者编号。这样,连接管理线程(在本例中为Java API中)就知道如何处理消息;然后,通道处理线程也知道如何处理该消息。
  • 消费者实现具有最广泛的变化,因为它实际上是特定于应用程序的。在我的实现中,我选择每次消息通过消费者到达时都剥离任务;因此,我有一个线程管理连接,一个线程管理通道(以及通过扩展,消费者),以及通过消费者传递的每条消息的一个或多个任务线程。
  • 关闭连接将关闭连接上的所有通道。关闭通道将关闭通道上的所有使用者。也可以取消使用者(不关闭通道)。在各种情况下,做这三件事中的任何一件都是有意义的。
  • 通常,在 AMQP 客户端中实现使用者会向使用者分配一个专用通道,以避免与其他线程或代码的活动(包括发布)发生冲突。

就你所说的消费者线程池而言,我怀疑Java客户端正在做一些类似于我编程我的客户端做的事情(我的是基于.Net客户端的,但经过了大量修改)。


推荐