在嵌套的 Java 8 并行流操作中使用信号量可能会发生死锁。这是一个错误吗?

请考虑以下情况:我们使用 Java 8 并行流来执行并行 forEach 循环,例如:

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})

并行线程的数量由系统属性“java.util.concurrent.ForkJoinPool.common.parallelism”控制,通常等于处理器的数量。

现在假设我们想要限制特定工作的并行执行次数 - 例如,因为该部分是内存密集型的,内存约束意味着并行执行的限制。

限制并行执行的一种明显而优雅的方法是使用信号量(此处建议),例如,以下代码图片将并行执行次数限制为5:

        final Semaphore concurrentExecutions = new Semaphore(5);
        IntStream.range(0,20).parallel().forEach(i -> {

            concurrentExecutions.acquireUninterruptibly();

            try {
                /* WORK DONE HERE */
            }
            finally {
                concurrentExecutions.release();
            }
        });

这工作得很好!

但是:在工作程序(at )内使用任何其他并行流可能会导致死锁/* WORK DONE HERE */

对我来说,这是一个意想不到的行为。

说明: 由于 Java 流使用 ForkJoin 池,因此内部 forEach 正在分叉,并且联接似乎在等待永远。但是,此行为仍然是意外的。请注意,如果设置为 1,则并行流甚至可以工作。"java.util.concurrent.ForkJoinPool.common.parallelism"

另请注意,如果存在内部平行 forEach,则它可能不透明。

问:此行为是否符合 Java 8 规范(在这种情况下,这意味着禁止在并行流工作程序中使用信号量),还是这是一个错误?

为方便起见:下面是一个完整的测试用例。两个布尔值的任意组合都有效,但“true,true”除外,这会导致死锁。

澄清:为了明确这一点,让我强调一个方面:死锁不会发生在信号量处。请注意,代码由acquire

  1. 获取信号量
  2. 运行一些代码
  3. 释放信号量

并且死锁发生在 2 处。如果该段代码正在使用另一个并行流。然后死锁发生在该 OTHER 流中。因此,似乎不允许同时使用嵌套的并行流和阻塞操作(如信号量)!

请注意,有文档记录表明并行流使用ForkJoinPool,并且ForkJoinPool和Semaphore属于同一个包 - (所以人们会期望它们很好地互操作)。java.util.concurrent

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christian-fries.de.
 *
 * Created on 03.05.2014
 */
package net.finmath.experiments.concurrency;

import java.util.concurrent.Semaphore;
import java.util.stream.IntStream;

/**
 * This is a test of Java 8 parallel streams.
 * 
 * The idea behind this code is that the Semaphore concurrentExecutions
 * should limit the parallel executions of the outer forEach (which is an
 * <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
 * the parallel executions of the outer forEach should be limited due to a
 * memory constrain).
 * 
 * Inside the execution block of the outer forEach we use another parallel stream
 * to create an inner forEach. The number of concurrent
 * executions of the inner forEach is not limited by us (it is however limited by a
 * system property "java.util.concurrent.ForkJoinPool.common.parallelism").
 * 
 * Problem: If the semaphore is used AND the inner forEach is active, then
 * the execution will be DEADLOCKED.
 * 
 * Note: A practical application is the implementation of the parallel
 * LevenbergMarquardt optimizer in
 * {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
 * In one application the number of tasks in the outer and inner loop is very large (>1000)
 * and due to memory limitation the outer loop should be limited to a small (5) number
 * of concurrent executions.
 * 
 * @author Christian Fries
 */
public class ForkJoinPoolTest {

    public static void main(String[] args) {

        // Any combination of the booleans works, except (true,true)
        final boolean isUseSemaphore    = true;
        final boolean isUseInnerStream  = true;

        final int       numberOfTasksInOuterLoop = 20;              // In real applications this can be a large number (e.g. > 1000).
        final int       numberOfTasksInInnerLoop = 100;             // In real applications this can be a large number (e.g. > 1000).
        final int       concurrentExecusionsLimitInOuterLoop = 5;
        final int       concurrentExecutionsLimitForStreams = 10;

        final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);

        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
        System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));

        IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {

            if(isUseSemaphore) {
                concurrentExecutions.acquireUninterruptibly();
            }

            try {
                System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread());

                if(isUseInnerStream) {
                    runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
                }
                else {
                    try {
                        Thread.sleep(10*numberOfTasksInInnerLoop);
                    } catch (Exception e) {
                    }
                }
            }
            finally {
                if(isUseSemaphore) {
                    concurrentExecutions.release();
                }
            }
        });

        System.out.println("D O N E");
    }

    /**
     * Runs code in a parallel forEach using streams.
     * 
     * @param numberOfTasksInInnerLoop Number of tasks to execute.
     */
    private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
        IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
            try {
                Thread.sleep(10);
            } catch (Exception e) {
            }
        });
    }
}

答案 1

任何时候,当您将问题分解为任务时,这些任务可能会在其他任务上被阻止,并尝试在有限的线程池中执行它们,您都面临着池引起的死锁的风险。请参阅 Java 并发实践 8.1。

毫无疑问,这是一个错误 - 在你的代码中。您正在用任务填充 FJ 池,这些任务将阻止等待同一池中其他任务的结果。有时你会很幸运,事情不会陷入僵局(就像并非所有的锁定排序错误都会导致死锁一样),但从根本上说,你在这里滑冰得很薄。


答案 2

我在探查器(VisualVM)中运行了您的测试,我同意:线程正在等待信号量,并在F / J池中的aWaitJoin()上。

这个框架在 join() 方面存在严重问题。四年来,我一直在写关于这个框架的批评。基本的连接问题从这里开始。

aWaitJoin() 也有类似的问题。您可以自己仔细阅读代码。当框架到达工作任务的底部时,它会发出一个 wait()。归根结底,这个框架没有办法进行上下文切换。

有一种方法可以获取此框架,以便为停滞的线程创建补偿线程。您需要实现 ForkJoinPool.ManagedBlocker 接口。你怎么能做到这一点,我不知道。您正在运行一个包含流的基本 API。您没有实现 Streams API 并编写自己的代码。

我坚持我上面的评论:一旦你将并行给API,你就放弃了控制并行机制内部工作的能力。API没有错误(除了它使用错误的框架进行并行操作。问题在于,信号量或任何其他用于控制 API 内并行性的方法都是危险的想法。


推荐