嵌套 Java 8 并行 forEach 循环性能较差。此行为是预期的吗?

注意:我已经在另一篇 SO 文章中解决了这个问题 - 在嵌套的 Java 8 并行流操作中使用信号量可能会死锁。这是一个错误吗?-,但是这篇文章的标题表明问题与信号量的使用有关-这在某种程度上分散了讨论的注意力。我创建这个是为了强调嵌套循环可能存在性能问题 - 尽管这两个问题都有一个共同的原因(也许是因为我花了很多时间来解决这个问题)。(我不认为它是重复的,因为它正在强调另一个症状 - 但如果你只是删除它)。

问题:如果嵌套两个 Java 8 stream.parallel().forEach 循环,并且所有任务都是独立的、无状态的等任务(除了提交到公共 FJ 池之外),那么在并行循环中嵌套并行循环的性能比在并行循环中嵌套顺序循环要差得多。更糟糕的是:如果包含内部循环的操作是同步的,你会得到一个死锁。

性能问题的演示

如果没有“已同步”,您仍然可以观察到性能问题。您可以在以下位置找到这方面的演示代码:http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java(有关更详细的说明,请参阅那里的JavaDoc)。

我们在这里的设置如下:我们有一个嵌套的 stream.parallel().forEach()。

  • 内部环路是独立的(无状态,无干扰等 - 除了使用公共池),并且在最坏的情况下总共消耗1秒,即如果按顺序处理。
  • 外部循环的一半任务在该循环之前消耗 10 秒。
  • 一半在循环后消耗10秒。
  • 因此,每个线程总共消耗 11 秒(最坏情况)。* 我们有一个布尔值,它允许将内部循环从 parallel() 切换到 sequential()。

现在:将 24 个外循环任务提交到并行度为 8 的池中,我们最多只能 24/8 * 11 = 33 秒(在 8 核或更好的计算机上)。

结果是:

  • 使用内部顺序循环:33 秒。
  • 内部并行循环:>80秒(我有92秒)。

问题:您能确认此行为吗?这是人们对框架的期望吗?(我现在更小心了,声称这是一个错误,但我个人认为这是由于ForkJoinTask实现中的错误。备注:我已将此发布到并发利益(请参阅 http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html),但到目前为止,我还没有从那里得到确认)。

僵局的证明

下面的代码将死锁

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });

其中 , , 和 是一些无状态的 CPU 刻录机。numberOfTasksInOuterLoop = 24numberOfTasksInInnerLoop = 240outerLoopOverheadFactor = 10000doWork

您可以在 http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java 找到完整的演示代码(有关更详细的说明,请参阅那里的JavaDoc)。

此行为是预期的吗?请注意,有关 Java 并行流的文档没有提到嵌套或同步的任何问题。此外,没有提到两者都使用公共分叉连接池的事实。

更新

有关性能问题的另一个测试可以在 http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java 中找到 - 此测试没有任何阻塞操作(没有Thread.sleep并且未同步)。我在这里整理了一些评论:http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

更新 2

看起来这个问题和带有信号量的更严重的死锁已经在Java8 u40中得到了修复。


答案 1

问题在于,您配置的相当有限的并行性被外部流处理所吞噬:如果您说您想要八个线程并处理超过八个项目的流,它将创建八个工作线程并让他们处理项目。parallel()

然后,在使用者中,您正在处理另一个流,但没有剩余的工作线程。由于工作线程在等待内部流处理结束时被阻塞,因此必须创建新的工作线程,这违反了您配置的并行性。在我看来,它不会回收这些扩展线程,而是让它们在处理后立即死亡。因此,在内部处理中,将创建并释放新线程,这是一项代价高昂的操作。parallel()ForkJoinPool

您可能会认为这是一个缺陷,即启动线程不参与并行流处理的计算,而只是等待结果,但即使已修复,您仍然会遇到难以修复的一般问题(如果有的话):

每当工作线程数与外部流项之间的比率较低时,实现就会将它们全部用于外部流,因为它不知道流是外部流。因此,并行执行内部流会请求比可用线程更多的工作线程。使用调用方线程参与计算可以以性能等于串行计算的方式修复它,但在这里获得并行执行的优势并不能很好地与固定数量的工作线程的概念一起使用。

请注意,您在这里触及了此问题的表面,因为您对项目的处理时间相当平衡。如果内部项目和外部项目两者的处理都存在分歧(与同一级别的项目相比),问题将更加严重。


更新:通过分析和查看代码,似乎确实尝试使用等待线程进行“工作窃取”,但根据是工作线程还是其他线程的事实使用不同的代码。因此,工作线程实际上等待了大约80%的时间,并且很少甚至没有做任何工作,而其他线程确实对计算做出了贡献......ForkJoinPoolThread


更新2:为了完整性,这里是注释中描述的简单并行执行方法。由于它对每个项目都进行排队,因此当单个项目的执行时间相当短时,预计会产生很大的开销。因此,这不是一个复杂的解决方案,而是一个证明,它可以处理长时间运行的任务,而没有太多的魔力......

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class NestedParallelForEachTest1 {
    static final boolean isInnerStreamParallel = true;

    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams = 8;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
        new NestedParallelForEachTest1().testNestedLoops();
        E.shutdown();
    }

    final static ThreadPoolExecutor E = new ThreadPoolExecutor(
        concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
        2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );

    public static void parallelForEach(IntStream s, IntConsumer c) {
        s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
         .forEach(NestedParallelForEachTest1::waitOrHelp);
    }
    static void waitOrHelp(Future f) {
        while(!f.isDone()) {
            Runnable r=E.getQueue().poll();
            if(r!=null) r.run();
        }
        try { f.get(); }
        catch(InterruptedException ex) { throw new RuntimeException(ex); }
        catch(ExecutionException eex) {
            Throwable t=eex.getCause();
            if(t instanceof RuntimeException) throw (RuntimeException)t;
            if(t instanceof Error) throw (Error)t;
            throw new UndeclaredThrowableException(t);
        }
    }
    public void testNestedLoops(NestedParallelForEachTest1 this) {
        long start = System.nanoTime();
        // Outer loop
        parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
            if(i < 10) sleep(10 * 1000);
            if(isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
            }
            else {
                // Inner loop as sequential
                IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
            }
            if(i >= 10) sleep(10 * 1000);
        });
        long end = System.nanoTime();
        System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
    }
    static void sleep(int milli) {
        try {
            Thread.sleep(milli);
        } catch (InterruptedException ex) {
            throw new AssertionError(ex);
        }
    }
}

答案 2

我可以确认这仍然是8u72中的性能问题,尽管它将不再死锁。并行终端操作仍然使用ForkJoinTask上下文之外的ForkJoinTask实例完成,这意味着每个并行流仍然共享公共池

为了证明一个简单的病理病例:

import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class ParallelPerf {

    private static final Object LOCK = new Object();

    private static void runInNewPool(Runnable task) {
        ForkJoinPool pool = new ForkJoinPool();
        try {
            pool.submit(task).join();
        } finally {
            pool.shutdown();
        }
    }

    private static <T> T runInNewPool(Callable<T> task) {
        ForkJoinPool pool = new ForkJoinPool();
        try {
            return pool.submit(task).join();
        } finally {
            pool.shutdown();
        }
    }

    private static void innerLoop() {
        IntStream.range(0, 32).parallel().forEach(i -> {
//          System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    public static void main(String[] args) {
        System.out.println("==DEFAULT==");
        long startTime = System.nanoTime();
        IntStream.range(0, 32).parallel().forEach(i -> {
            synchronized (LOCK) {
                innerLoop();
            }
//          System.out.println(" outer: " + Thread.currentThread().getName());
        });
        System.out.println(System.nanoTime() - startTime);

        System.out.println("==NEW POOLS==");
        startTime = System.nanoTime();
        IntStream.range(0, 32).parallel().forEach(i -> {
            synchronized (LOCK) {
                runInNewPool(() -> innerLoop());
            }
//          System.out.println(" outer: " + Thread.currentThread().getName());
        });
        System.out.println(System.nanoTime() - startTime);
    }
}

第二次运行传递给 而不是直接调用它。在我的机器(i7-4790,8个CPU线程)上,我得到了大约4倍的加速:innerLooprunInNewPool

==DEFAULT==
4321223964
==NEW POOLS==
1015314802

取消注释其他打印语句会使问题变得明显:

[...]
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-6
 outer: ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-3
[...]
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-3
 outer: ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-4
[...]

公共池工作线程堆积在同步块上,一次只能进入一个线程。由于内部并行操作使用相同的池,并且池中的所有其他线程都在等待锁定,因此我们得到单线程执行。

使用单独的 ForkJoinPool 实例的结果:

[...]
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-6
ForkJoinPool-1-worker-5
 outer: ForkJoinPool.commonPool-worker-4
ForkJoinPool-2-worker-1
ForkJoinPool-2-worker-5
[...]
ForkJoinPool-2-worker-7
ForkJoinPool-2-worker-3
 outer: ForkJoinPool.commonPool-worker-1
ForkJoinPool-3-worker-2
ForkJoinPool-3-worker-5
[...]

我们仍然一次在一个工作线程上运行内部循环,但内部并行操作每次都会获得一个新的池,并且可以利用其所有工作线程。

这是一个人为的示例,但删除同步块仍然显示出类似的速度差异,因为内部循环和外部循环仍在相同的工作线程上竞争。多线程应用程序在多个线程中使用并行流时需要小心,因为这可能导致它们在重叠时随机减速。

这是所有终端操作的问题,而不仅仅是 ,因为它们都在公共池中运行任务。我使用上述方法作为解决方法,但希望这会在某个时候内置到标准库中。forEachrunInNewPool


推荐