RabbitMQ 示例:多个线程、通道和队列

我刚刚阅读了RabbitMQ的Java API文档,发现它非常翔实和直截了当。有关如何设置用于发布/使用的简单示例非常易于遵循和理解。但这是一个非常简单/基本的例子,它给我留下了一个重要的问题:如何设置1个以上的通道来发布/消费到多个队列和从多个队列使用?Channel

假设我有一个 RabbitMQ 服务器,上面有 3 个队列:、 和 。因此,我们要么需要一个队列才能发布/使用到所有3个队列,要么更有可能的是,有3个单独的,每个队列专用于一个队列。loggingsecurity_eventscustomer_ordersChannelChannels

最重要的是,RabbitMQ的最佳实践要求我们为每个消费者线程设置1个。对于此示例,假设只有 1 个使用者线程即可,但两者都需要 5 个线程来处理卷。所以,如果我理解正确,这是否意味着我们需要:Channelsecurity_eventsloggingcustomer_order

  • 1 和 1 个用于发布/消费到和从中消费的使用者线程;和Channelsecurity_events
  • 5 和 5 个用于发布/消费到和从中消费的使用者线程;和Channelslogging
  • 用于发布/消费到和从 ?Channelscustomer_orders

如果我的理解在这里被误导了,请从纠正我开始。无论哪种方式,一些厌战的RabbitMQ老手能否帮助我“连接点”,并提供一个体面的代码示例,以设置满足我在这里要求的发布者/消费者?提前致谢!


答案 1

我认为你在初步理解方面有几个问题。坦率地说,看到以下内容,我有点惊讶: .您如何确定您需要确切的数字?你有什么保证5线程就足够了吗?both need 5 threads to handle the volume

RabbitMQ经过调整和时间测试,因此它完全是关于正确设计和高效的消息处理。

让我们尝试查看问题并找到适当的解决方案。顺便说一句,消息队列本身不会提供任何保证你有真正好的解决方案。你必须了解你在做什么,还要做一些额外的测试。

如您所知,有许多可能的布局:

enter image description here

我将使用布局作为说明生产者消费者问题的最简单方法。由于您非常担心吞吐量。顺便说一句,正如您可能期望的RabbitMQ表现得很好(来源)。注意,我稍后会解决它:B1NprefetchCount

enter image description here

因此,消息处理逻辑很可能是确保您有足够的吞吐量的正确位置。当然,每次需要处理消息时,您都可以跨越新线程,但最终这种方法会杀死您的系统。基本上,你会得到更多的线程,你会得到更大的延迟(如果你愿意,你可以检查阿姆达尔定律)。

enter image description here

(见阿姆达尔定律图解)

提示#1:小心线程,使用线程池(详细信息

线程池可以描述为 Runnable 对象(工作队列)的集合和正在运行的线程的连接。这些线程不断运行,并检查工作查询中是否有新工作。如果有新的工作要做,他们会执行这个Runnable。Thread 类本身提供了一个方法,例如 execute(Runnable r) 将新的 Runnable 对象添加到工作队列中。

public class Main {
  private static final int NTHREDS = 10;

  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
    for (int i = 0; i < 500; i++) {
      Runnable worker = new MyRunnable(10000000L + i);
      executor.execute(worker);
    }
    // This will make the executor accept no new threads
    // and finish all existing threads in the queue
    executor.shutdown();
    // Wait until all threads are finish
    executor.awaitTermination();
    System.out.println("Finished all threads");
  }
} 

提示#2:注意消息处理开销

我会说这是明显的优化技术。您可能会发送小而易于处理的消息。整个方法是关于要连续设置和处理的较小消息。大消息最终会玩一个坏笑话,所以最好避免这种情况。

enter image description here

因此,最好发送微小的信息,但是处理呢?每次提交作业时都会产生开销。在传入消息速率高的情况下,批处理可能非常有用。

enter image description here

例如,假设我们有简单的消息处理逻辑,并且不希望每次处理消息时都有特定于线程的开销。为了优化那个非常简单:CompositeRunnable can be introduced

class CompositeRunnable implements Runnable {

    protected Queue<Runnable> queue = new LinkedList<>();

    public void add(Runnable a) {
        queue.add(a);
    }

    @Override
    public void run() {
        for(Runnable r: queue) {
            r.run();
        }
    }
}

或者以稍微不同的方式执行相同的操作,方法是收集要处理的消息:

class CompositeMessageWorker<T> implements Runnable {

    protected Queue<T> queue = new LinkedList<>();

    public void add(T message) {
        queue.add(message);
    }

    @Override
    public void run() {
        for(T message: queue) {
            // process a message
        }
    }
}

通过这种方式,您可以更有效地处理消息。

提示#3:优化消息处理

尽管您知道可以并行处理消息()并减少处理开销(),但您必须快速完成所有操作。冗余的处理步骤、繁重的循环等可能会对性能产生很大影响。请参阅有趣的案例研究:Tip #1Tip #2

enter image description here

通过选择正确的 XML 解析器将消息队列吞吐量提高十倍

提示#4:连接和渠道管理

  • 在现有连接上启动新通道涉及一次网络往返 - 启动新连接需要多次。
  • 每个连接在服务器上使用一个文件描述符。频道则不然。
  • 在一个通道上发布大型消息将在连接断开时阻止连接。除此之外,多路复用是相当透明的。
  • 如果服务器过载,正在发布的连接可能会被阻止 - 最好将发布和使用连接分开
  • 准备好处理消息突发

()

请注意,所有提示都完美地协同工作。如果您需要其他详细信息,请随时告诉我。

完整的消费者示例(来源

请注意以下事项:

  • channel.basicQos(prefetch) - 正如你之前看到的,可能非常有用:prefetchCount

    此命令允许使用者选择一个预取窗口,该窗口指定它准备接收的未确认消息的数量。通过将预取计数设置为非零值,代理将不会向使用者传递任何超出该限制的消息。若要向前移动窗口,使用者必须确认收到一条消息(或一组消息)。

  • ExecutorService threadExecutor - 您可以指定正确配置的执行器服务。

例:

static class Worker extends DefaultConsumer {

    String name;
    Channel channel;
    String queue;
    int processed;
    ExecutorService executorService;

    public Worker(int prefetch, ExecutorService threadExecutor,
                  , Channel c, String q) throws Exception {
        super(c);
        channel = c;
        queue = q;
        channel.basicQos(prefetch);
        channel.basicConsume(queue, false, this);
        executorService = threadExecutor;
    }

    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        Runnable task = new VariableLengthTask(this,
                                               envelope.getDeliveryTag(),
                                               channel);
        executorService.submit(task);
    }
}

您还可以检查以下内容:


答案 2

如何设置 1 个以上的通道以在多个队列中发布/使用?

您可以使用线程和通道来实现。您所需要的只是一种对事物进行分类的方法,即登录名中的所有队列项目,security_events中的所有队列元素等。可以使用路由键实现分类。

即:每次将项目添加到队列时,请指定路由键。它将作为属性元素追加。通过这种方式,您可以从特定事件中获取值,比如日志记录

下面的代码示例说明如何在客户端完成此操作。

例如:

路由密钥用于标识通道的类型并检索类型。

例如,如果您需要获取有关 Login 类型的所有通道,则必须将路由密钥指定为 login 或其他关键字来标识它。

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            string routingKey="login";

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

您可以在此处查看有关分类的更多详细信息。


螺纹部分

发布部分结束后,您可以运行线程部分。

在此部分中,您可以根据类别获取已发布的数据。小路由密钥,在您的情况下是日志记录,security_events和customer_orders等。

查看示例以了解如何在线程中检索数据。

例如 :

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
//**The threads part is as follows** 
 channel.exchangeDeclare(EXCHANGE_NAME, "direct");      
 String queueName = channel.queueDeclare().getQueue();
    // This part will biend the queue with the severity (login for eg:)
    for(String severity : argv){
              channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    }
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "myConsumerTag",
    new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
         throws IOException
     {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();

             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
     }
 });

现在,将创建一个线程来处理类型为 login(路由键) 的队列中的数据。通过这种方式,您可以创建多个线程。每个服务于不同的目的。

在此处查看有关线程部分的更多详细信息。