Java BlockingQueue 在 Linux 上的延迟很高
我正在使用BlockingQueue:s(尝试ArrayBlockingQueue和LinkedBlockingQueue)在我当前正在处理的应用程序中的不同线程之间传递对象。性能和延迟在这个应用程序中相对重要,所以我很好奇使用BlocklingQueue在两个线程之间传递对象需要多少时间。为了衡量这一点,我写了一个具有两个线程(一个消费者和一个生产者)的简单程序,我让生产者将时间戳(使用System.nanoTime()拍摄)传递给消费者,请参阅下面的代码。
我记得在某个论坛上读到,其他人尝试了大约10微秒(不知道使用的是什么操作系统和硬件),所以当我的Windows 7盒子(英特尔E7500酷睿2双核CPU,2.93GHz)上花费了大约30微秒时,我并没有太惊讶,同时在后台运行许多其他应用程序。然而,当我在我们更快的Linux服务器上(两个Intel X5677 3.46GHz四核CPU,运行带有内核2.6.26-2-amd64的Debian 5)上进行相同的测试时,我感到非常惊讶。我预计延迟会比我的Windows Box低,但相反,它要高得多 - ~75 – 100微秒!这两个测试都是使用Sun的Hotspot JVM版本1.6.0-23完成的。
有没有人在Linux上做过类似的测试,结果相似?或者有谁知道为什么它在Linux上要慢得多(使用更好的硬件),难道Linux上的线程切换比Windows慢得多吗?如果是这样的话,似乎Windows实际上更适合某种应用程序。任何帮助我理解相对较高的数字的帮助都非常感谢。
编辑:
在DaveC的评论之后,我还做了一个测试,我将JVM(在Linux机器上)限制为单个内核(即在同一内核上运行的所有线程)。这极大地改变了结果 - 延迟下降到20微秒以下,即比Windows机器上的结果更好。我还做了一些测试,我将生产者线程限制在一个内核上,将使用者线程限制在另一个内核上(尝试将它们放在同一套接字和不同的套接字上),但这似乎没有帮助 - 延迟仍然是~75微秒。顺便说一句,这个测试应用程序几乎是我在执行测试时在机器上运行的所有内容。
有谁知道这些结果是否有意义?如果生产者和消费者在不同的内核上运行,它真的应该慢得多吗?任何输入都非常感谢。
再次编辑(1月6日):
我尝试了对代码和运行环境的不同更改:
我将Linux内核升级到2.6.36.2(从2.6.26.2)。内核升级后,测量时间从升级前的 75 微秒更改为 60 微秒,变化非常小。为生产者线程和使用者线程设置 CPU 关联性没有任何效果,除非将它们限制在同一内核上。在同一内核上运行时,测量的延迟为 13 微秒。
在原始代码中,我让生产者在每次迭代之间休眠1秒钟,以便给消费者足够的时间来计算经过的时间并将其打印到控制台。如果我删除对 Thread.sleep () 的调用,而是让生产者和使用者在每次迭代中都调用 barrier.await(消费者在将经过的时间打印到控制台后调用它),则测量的延迟将从 60 微秒减少到 10 微秒以下。如果在同一内核上运行线程,则延迟将低于 1 微秒。谁能解释为什么这大大减少了延迟?我的第一个猜测是,这个变化的效果是生产者在消费者调用 queue.take()之前调用 queue.put(),所以消费者从来不需要阻止,但是在玩了 ArrayBlockingQueue 的修改版本之后,我发现这个猜测是错误的 —— 消费者实际上确实阻止了。如果您有其他猜测,请告诉我。(顺便说一句,如果我让生产者同时调用Thread.sleep()和barriage.await(),延迟保持在60微秒)。
我还尝试了另一种方法 - 而不是调用queet.take(),我调用queet.poll()的超时值为100微秒。这将平均延迟降低到10微秒以下,但当然是CPU密集型的(但可能比繁忙的等待更少的CPU密集型?
再次编辑(1月10日) - 问题已解决:
ninjalj建议约60微秒的延迟是由于CPU必须从更深的睡眠状态中唤醒 - 他完全正确!在 BIOS 中禁用 C 状态后,延迟降低到 <10 微秒。这就解释了为什么我在上面的第2点下得到了更好的延迟 - 当我更频繁地发送对象时,CPU保持足够繁忙,以至于不会进入更深的睡眠状态。非常感谢大家花时间阅读我的问题并在这里分享您的想法!
...
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CyclicBarrier;
public class QueueTest {
ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
Thread consumerThread;
CyclicBarrier barrier = new CyclicBarrier(2);
static final int RUNS = 500000;
volatile int sleep = 1000;
public void start() {
consumerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
barrier.await();
for(int i = 0; i < RUNS; i++) {
consume();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
consumerThread.start();
try {
barrier.await();
} catch (Exception e) { e.printStackTrace(); }
for(int i = 0; i < RUNS; i++) {
try {
if(sleep > 0)
Thread.sleep(sleep);
produce();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void produce() {
try {
queue.put(System.nanoTime());
} catch (InterruptedException e) {
}
}
public void consume() {
try {
long t = queue.take();
long now = System.nanoTime();
long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
if(sleep > 0) {
System.out.println("Time: " + time);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
QueueTest test = new QueueTest();
System.out.println("Starting...");
// Run first once, ignoring results
test.sleep = 0;
test.start();
// Run again, printing the results
System.out.println("Starting again...");
test.sleep = 1000;
test.start();
}
}