为什么并行流不使用 ForkJoinPool 的所有线程?

所以我知道,如果你使用没有自定义ForkJoinPool的,它将使用默认的ForkJoinPool,默认情况下,它比你有处理器少一个线程。parallelStream

因此,正如这里所述(以及该问题的其他答案),为了获得更多的并行性,您必须:

将并行流执行提交到您自己的 ForkJoinPool:yourFJP.submit(() -> stream.parallel().forEach(doSomething));

所以,我这样做了:

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool(1000);

        IntStream stream = IntStream.range(0, 999999);

        final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());

        forkJoinPool.submit(() -> {
            stream.parallel().forEach(n -> {

                System.out.println("Processing n: " + n);
                try {
                    Thread.sleep(500);
                    thNames.add(Thread.currentThread().getName());
                    System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }).get();
    }
}

我制作了一组线程名称,以查看正在创建的线程数,并且还记录了池具有的活动线程数,并且两个数字都不会增长到16,因此这意味着这里的并行度不超过16(为什么甚至16?)。如果我不使用forkJoinPool,我会得到4作为并行度,这是根据我拥有的处理器数量。

为什么它给我16而不是1000?


答案 1

更新

最初,这个答案是一个精心设计的解释,声称应用背压,甚至没有达到规定的并行度水平,因为总是有空闲的工作线程可以处理流。ForkJoinPool

这是不正确的。

实际答案在标记为重复的原始问题中提供 - 官方不支持使用自定义进行流处理,并且在使用 时,默认池并行度用于确定流拆分器行为。ForkJoinPoolforEach

下面是一个示例,当任务手动提交到自定义时,池的活动线程数很容易达到其并行度级别:ForkJoinPool

for (int i = 0; i < 1_000_000; ++i) {
   forkJoinPool.submit(() -> {
      try {
         Thread.sleep(1);
         thNames.add(Thread.currentThread().getName());
         System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
   });
}

感谢Stuart Marks指出这一点,感谢Sotirios Delimanolis认为我最初的答案是错误的:)


答案 2

在我看来,当您向FJP提交lambda时,该lambda将使用公共池而不是FJP。Sotirios Delimanolis在上面的评论中证明了这一点。您提交的是在 FJP 中运行的任务。

尝试分析此代码以查看实际使用的线程。

您无法在 FJP 中命名线程。


推荐