我认为你在初步理解方面有几个问题。坦率地说,看到以下内容,我有点惊讶: .您如何确定您需要确切的数字?你有什么保证5线程就足够了吗?both need 5 threads to handle the volume
RabbitMQ经过调整和时间测试,因此它完全是关于正确设计和高效的消息处理。
让我们尝试查看问题并找到适当的解决方案。顺便说一句,消息队列本身不会提供任何保证你有真正好的解决方案。你必须了解你在做什么,还要做一些额外的测试。
如您所知,有许多可能的布局:
我将使用布局作为说明生产者消费者问题的最简单方法。由于您非常担心吞吐量。顺便说一句,正如您可能期望的RabbitMQ表现得很好(来源)。注意,我稍后会解决它:B
1
N
prefetchCount
因此,消息处理逻辑很可能是确保您有足够的吞吐量的正确位置。当然,每次需要处理消息时,您都可以跨越新线程,但最终这种方法会杀死您的系统。基本上,你会得到更多的线程,你会得到更大的延迟(如果你愿意,你可以检查阿姆达尔定律)。
(见阿姆达尔定律图解)
提示#1:小心线程,使用线程池(详细信息)
线程池可以描述为 Runnable 对象(工作队列)的集合和正在运行的线程的连接。这些线程不断运行,并检查工作查询中是否有新工作。如果有新的工作要做,他们会执行这个Runnable。Thread 类本身提供了一个方法,例如 execute(Runnable r) 将新的 Runnable 对象添加到工作队列中。
public class Main {
private static final int NTHREDS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
Runnable worker = new MyRunnable(10000000L + i);
executor.execute(worker);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
executor.awaitTermination();
System.out.println("Finished all threads");
}
}
提示#2:注意消息处理开销
我会说这是明显的优化技术。您可能会发送小而易于处理的消息。整个方法是关于要连续设置和处理的较小消息。大消息最终会玩一个坏笑话,所以最好避免这种情况。
因此,最好发送微小的信息,但是处理呢?每次提交作业时都会产生开销。在传入消息速率高的情况下,批处理可能非常有用。
例如,假设我们有简单的消息处理逻辑,并且不希望每次处理消息时都有特定于线程的开销。为了优化那个非常简单:CompositeRunnable can be introduced
class CompositeRunnable implements Runnable {
protected Queue<Runnable> queue = new LinkedList<>();
public void add(Runnable a) {
queue.add(a);
}
@Override
public void run() {
for(Runnable r: queue) {
r.run();
}
}
}
或者以稍微不同的方式执行相同的操作,方法是收集要处理的消息:
class CompositeMessageWorker<T> implements Runnable {
protected Queue<T> queue = new LinkedList<>();
public void add(T message) {
queue.add(message);
}
@Override
public void run() {
for(T message: queue) {
// process a message
}
}
}
通过这种方式,您可以更有效地处理消息。
提示#3:优化消息处理
尽管您知道可以并行处理消息()并减少处理开销(),但您必须快速完成所有操作。冗余的处理步骤、繁重的循环等可能会对性能产生很大影响。请参阅有趣的案例研究:Tip #1
Tip #2
通过选择正确的 XML 解析器将消息队列吞吐量提高十倍
提示#4:连接和渠道管理
- 在现有连接上启动新通道涉及一次网络往返 - 启动新连接需要多次。
- 每个连接在服务器上使用一个文件描述符。频道则不然。
- 在一个通道上发布大型消息将在连接断开时阻止连接。除此之外,多路复用是相当透明的。
- 如果服务器过载,正在发布的连接可能会被阻止 - 最好将发布和使用连接分开
- 准备好处理消息突发
(源)
请注意,所有提示都完美地协同工作。如果您需要其他详细信息,请随时告诉我。
完整的消费者示例(来源)
请注意以下事项:
-
channel.basicQos(prefetch) - 正如你之前看到的,可能非常有用:
prefetchCount
此命令允许使用者选择一个预取窗口,该窗口指定它准备接收的未确认消息的数量。通过将预取计数设置为非零值,代理将不会向使用者传递任何超出该限制的消息。若要向前移动窗口,使用者必须确认收到一条消息(或一组消息)。
-
ExecutorService threadExecutor - 您可以指定正确配置的执行器服务。
例:
static class Worker extends DefaultConsumer {
String name;
Channel channel;
String queue;
int processed;
ExecutorService executorService;
public Worker(int prefetch, ExecutorService threadExecutor,
, Channel c, String q) throws Exception {
super(c);
channel = c;
queue = q;
channel.basicQos(prefetch);
channel.basicConsume(queue, false, this);
executorService = threadExecutor;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
Runnable task = new VariableLengthTask(this,
envelope.getDeliveryTag(),
channel);
executorService.submit(task);
}
}
您还可以检查以下内容: