Java BlockingQueue 在 Linux 上的延迟很高

2022-09-01 17:48:52

我正在使用BlockingQueue:s(尝试ArrayBlockingQueue和LinkedBlockingQueue)在我当前正在处理的应用程序中的不同线程之间传递对象。性能和延迟在这个应用程序中相对重要,所以我很好奇使用BlocklingQueue在两个线程之间传递对象需要多少时间。为了衡量这一点,我写了一个具有两个线程(一个消费者和一个生产者)的简单程序,我让生产者将时间戳(使用System.nanoTime()拍摄)传递给消费者,请参阅下面的代码。

我记得在某个论坛上读到,其他人尝试了大约10微秒(不知道使用的是什么操作系统和硬件),所以当我的Windows 7盒子(英特尔E7500酷睿2双核CPU,2.93GHz)上花费了大约30微秒时,我并没有太惊讶,同时在后台运行许多其他应用程序。然而,当我在我们更快的Linux服务器上(两个Intel X5677 3.46GHz四核CPU,运行带有内核2.6.26-2-amd64的Debian 5)上进行相同的测试时,我感到非常惊讶。我预计延迟会比我的Windows Box低,但相反,它要高得多 - ~75 – 100微秒!这两个测试都是使用Sun的Hotspot JVM版本1.6.0-23完成的。

有没有人在Linux上做过类似的测试,结果相似?或者有谁知道为什么它在Linux上要慢得多(使用更好的硬件),难道Linux上的线程切换比Windows慢得多吗?如果是这样的话,似乎Windows实际上更适合某种应用程序。任何帮助我理解相对较高的数字的帮助都非常感谢。

编辑:
在DaveC的评论之后,我还做了一个测试,我将JVM(在Linux机器上)限制为单个内核(即在同一内核上运行的所有线程)。这极大地改变了结果 - 延迟下降到20微秒以下,即比Windows机器上的结果更好。我还做了一些测试,我将生产者线程限制在一个内核上,将使用者线程限制在另一个内核上(尝试将它们放在同一套接字和不同的套接字上),但这似乎没有帮助 - 延迟仍然是~75微秒。顺便说一句,这个测试应用程序几乎是我在执行测试时在机器上运行的所有内容。

有谁知道这些结果是否有意义?如果生产者和消费者在不同的内核上运行,它真的应该慢得多吗?任何输入都非常感谢。

再次编辑(1月6日):
我尝试了对代码和运行环境的不同更改:

  1. 我将Linux内核升级到2.6.36.2(从2.6.26.2)。内核升级后,测量时间从升级前的 75 微秒更改为 60 微秒,变化非常小。为生产者线程和使用者线程设置 CPU 关联性没有任何效果,除非将它们限制在同一内核上。在同一内核上运行时,测量的延迟为 13 微秒。

  2. 在原始代码中,我让生产者在每次迭代之间休眠1秒钟,以便给消费者足够的时间来计算经过的时间并将其打印到控制台。如果我删除对 Thread.sleep () 的调用,而是让生产者和使用者在每次迭代中都调用 barrier.await(消费者在将经过的时间打印到控制台后调用它),则测量的延迟将从 60 微秒减少到 10 微秒以下。如果在同一内核上运行线程,则延迟将低于 1 微秒。谁能解释为什么这大大减少了延迟?我的第一个猜测是,这个变化的效果是生产者在消费者调用 queue.take()之前调用 queue.put(),所以消费者从来不需要阻止,但是在玩了 ArrayBlockingQueue 的修改版本之后,我发现这个猜测是错误的 —— 消费者实际上确实阻止了。如果您有其他猜测,请告诉我。(顺便说一句,如果我让生产者同时调用Thread.sleep()和barriage.await(),延迟保持在60微秒)。

  3. 我还尝试了另一种方法 - 而不是调用queet.take(),我调用queet.poll()的超时值为100微秒。这将平均延迟降低到10微秒以下,但当然是CPU密集型的(但可能比繁忙的等待更少的CPU密集型?

再次编辑(1月10日) - 问题已解决:
ninjalj建议约60微秒的延迟是由于CPU必须从更深的睡眠状态中唤醒 - 他完全正确!在 BIOS 中禁用 C 状态后,延迟降低到 <10 微秒。这就解释了为什么我在上面的第2点下得到了更好的延迟 - 当我更频繁地发送对象时,CPU保持足够繁忙,以至于不会进入更深的睡眠状态。非常感谢大家花时间阅读我的问题并在这里分享您的想法!

...

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CyclicBarrier;

public class QueueTest {

    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
    Thread consumerThread;
    CyclicBarrier barrier = new CyclicBarrier(2);
    static final int RUNS = 500000;
    volatile int sleep = 1000;

    public void start() {
        consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    barrier.await();
                    for(int i = 0; i < RUNS; i++) {
                        consume();

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        });
        consumerThread.start();

        try {
            barrier.await();
        } catch (Exception e) { e.printStackTrace(); }

        for(int i = 0; i < RUNS; i++) {
            try {
                if(sleep > 0)
                    Thread.sleep(sleep);
                produce();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void produce() {
        try {
            queue.put(System.nanoTime());
        } catch (InterruptedException e) {
        }
    }

    public void consume() {
        try {
            long t = queue.take();
            long now = System.nanoTime();
            long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
            if(sleep > 0) {
                System.out.println("Time: " + time);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        QueueTest test = new QueueTest();
        System.out.println("Starting...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start();
        // Run again, printing the results
        System.out.println("Starting again...");
        test.sleep = 1000;
        test.start();
    }
}

答案 1

您的测试不是队列切换延迟的良好度量,因为您有一个线程读取队列,该线程在再次发生之前同步写入(在队列中执行字符串和长串联)。要正确测量这一点,您需要将此活动移出此线程,并在 take 线程中尽可能少地执行工作。System.out

您最好只是在接受器中执行计算(当时 - 现在),并将结果添加到其他集合中,该集合由输出结果的另一个线程定期排出。我倾向于通过添加通过 AtomicReference 访问的适当预配置的数组支持结构来做到这一点(因此,报告线程只需在该引用上使用该存储结构的另一个实例 getAndSet,以便获取最新一批结果;例如,创建 2 个列表,将一个设置为活动列表,每个 x s 一个线程唤醒并交换主动和被动)。然后,您可以报告一些分布而不是每个结果(例如十分位数范围),这意味着您不必在每次运行时生成大量日志文件,也不会为您打印有用的信息。

FWIW 我同意Peter Lawrey所说的时间,如果延迟真的很关键,那么你需要考虑忙于等待适当的CPU亲和力(即为该线程专用一个核心)

1月6日后编辑

如果我删除对 Thread.sleep () 的调用,而是让生产者和使用者在每次迭代中都调用 barrier.await(消费者在将经过的时间打印到控制台后调用它),则测量的延迟将从 60 微秒减少到 10 微秒以下。如果在同一内核上运行线程,则延迟将低于 1 微秒。谁能解释为什么这大大减少了延迟?

您正在查看(和相应的)和 之间的区别。大多数j.u.c.的东西都是建立在(通常通过提供或直接)的基础上的,这(在Hotspot中)解析为(和),这往往最终落入pthread(posix线程)库的手中。通常用于唤醒和/或用于类似 .java.util.concurrent.locks.LockSupport#parkunparkThread#sleepLockSupportAbstractQueuedSynchronizerReentrantLocksun.misc.Unsafe#parkunparkpthread_cond_broadcastpthread_cond_waitpthread_cond_timedwaitBlockingQueue#take

我不能说我曾经看过实际实现的方式(因为我从来没有遇到过不是基于条件的等待的低延迟),但我想它会导致它被schedular以比pthreading机制更激进的方式降级,这就是延迟差异的原因。Thread#sleep


答案 2

如果可以的话,我会只使用ArrayBlockingQueue。当我使用它时,Linux上的延迟在8-18微秒之间。一些注意事项。

  • 成本很大程度上是唤醒线程所需的时间。当你唤醒一个线程时,它的数据/代码不会在缓存中,所以你会发现,如果你计算线程唤醒后发生的事情,可能比你重复运行同样的事情多花2-5倍的时间。
  • 某些操作使用操作系统调用(例如锁定/循环屏障),在低延迟方案中,这些调用通常比忙于等待更昂贵。我建议尝试忙于等待您的生产者,而不是使用CyclicBarrier。你也可以忙着等待你的消费者,但这在真正的系统上可能是不合理的昂贵。