BlockingQueue的实现:SynchronousQueue和LinkedBlockingQueue有什么区别

我看到了BlocklingQueue的这些实现,无法理解它们之间的区别。到目前为止,我的结论是:

  1. 我永远不需要同步队列
  2. LinkedBlockingQueue确保FIFO,BlockingQueue必须使用参数true创建才能使其成为FIFO
  3. 同步队列中断大多数集合方法(包含、大小等)

那么我什么时候需要同步队列呢?此实现的性能是否优于 LinkedBlockingQueue

为了使它变得更加复杂...为什么 Executors.newCachedThreadPool 使用 SynchronousQueue 而其他(Executors.newSingleThreadExecutorExecutors.newFixedThreadPool)使用 LinkedBlockingQueue?

编辑

第一个问题解决了。但我仍然不明白为什么Executors.newCachedThreadPool使用SyncQueue,而其他人(Executors.newSingleThreadExecutorExecutors.newFixedThreadPool)使用LinkedBlockingQueue?

我得到的是,使用Syncqueue,如果没有空闲线程,生产者将被阻止。但是,由于线程的数量几乎是无限的(如果需要,将创建新线程),因此永远不会发生这种情况。那么为什么它应该使用SyncQueue呢?


答案 1

SynchronousQueue是一种非常特殊的队列 - 它在 接口后面实现了一种交会方法(生产者等到消费者准备好,消费者等到生产者准备好)。Queue

因此,您可能仅在需要该特定语义的特殊情况下才需要它,例如,单线程任务而不排队进一步的请求

使用的另一个原因是性能。的实现似乎经过了大量优化,因此,如果您不需要比集合点更多的内容(例如,在“按需”创建使用者的情况下,以便队列项不会累积),则可以通过使用 获得性能提升。SynchronousQueueSynchronousQueueExecutors.newCachedThreadPool()SynchronousQueue

简单的综合测试表明,在一个简单的单生产者-单消费者场景中,双核机器的吞吐量比吞吐量高出约20倍,并且队列长度=1。当队列长度增加时,它们的吞吐量会上升,几乎达到 吞吐量。这意味着与其他队列相比,多核计算机上的同步开销较低。但同样,只有在特定情况下,当您需要伪装成 的会合点时,它才重要。SynchronousQueueLinkedBlockingQueueArrayBlockingQueueSynchronousQueueSynchronousQueueQueue

编辑:

这是一个测试:

public class Test {
    static ExecutorService e = Executors.newFixedThreadPool(2);
    static int N = 1000000;

    public static void main(String[] args) throws Exception {    
        for (int i = 0; i < 10; i++) {
            int length = (i == 0) ? 1 : i * 5;
            System.out.print(length + "\t");
            System.out.print(doTest(new LinkedBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new ArrayBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new SynchronousQueue<Integer>(), N));
            System.out.println();
        }

        e.shutdown();
    }

    private static long doTest(final BlockingQueue<Integer> q, final int n) throws Exception {
        long t = System.nanoTime();

        e.submit(new Runnable() {
            public void run() {
                for (int i = 0; i < n; i++)
                    try { q.put(i); } catch (InterruptedException ex) {}
            }
        });    

        Long r = e.submit(new Callable<Long>() {
            public Long call() {
                long sum = 0;
                for (int i = 0; i < n; i++)
                    try { sum += q.take(); } catch (InterruptedException ex) {}
                return sum;
            }
        }).get();
        t = System.nanoTime() - t;

        return (long)(1000000000.0 * N / t); // Throughput, items/sec
    }
}    

这是我的机器上的结果:

enter image description here


答案 2

目前,默认(基于)可以使用一组固定大小的预创建线程和一定大小的线程进行任何溢出,或者创建最大大小(如果(并且仅当)该队列已满时的最大大小。ExecutorsThreadPoolExecutorBlockingQueue

这导致了一些令人惊讶的属性。例如,由于只有在达到队列容量后才会创建其他线程,因此使用 a(无限)意味着永远不会创建新线程,即使当前池大小为零也是如此。如果使用 ,则仅当新线程已满时才会创建新线程,并且如果池到那时尚未清除空间,则后续作业很可能被拒绝。LinkedBlockingQueueArrayBlockingQueue

A 的容量为零,因此生产者会阻塞,直到使用者可用或创建线程。这意味着,尽管@axtavt缓存线程池生成的数据看起来令人印象深刻,但从生产者的角度来看,缓存线程池的性能通常最差。SynchronousQueue

不幸的是,目前还没有一个很好的折衷实现的库版本,可以在突发或活动期间创建线程,从较低的最小值达到某个最大值。您要么有一个可增长的池,要么有一个固定的池。我们内部有一个,但它还没有准备好供公众消费。