来自哈希集的并行流不并行运行

我有一些要并行处理的元素的集合。当我使用时,并行性有效。但是,当我使用时,它不会并行运行。ListSet

我写了一个代码示例来说明问题:

public static void main(String[] args) {
    ParallelTest test = new ParallelTest();

    List<Integer> list = Arrays.asList(1,2);
    Set<Integer> set = new HashSet<>(list);

    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    System.out.println("set print");
    try {
        forkJoinPool.submit(() ->
            set.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }

    System.out.println("\n\nlist print");
    try {
        forkJoinPool.submit(() ->
            list.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }   
}

private void print(int i){
    System.out.println("start: " + i);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    }
    System.out.println("end: " + i);
}

这是我在Windows 7上获得的输出

set print
start: 1
end: 1
start: 2
end: 2

list print
start: 2
start: 1
end: 1
end: 2

我们可以看到,在第二个元素被处理之前,必须完成中的第一个元素。对于 ,第二个元素在第一个元素完成之前开始。SetList

你能告诉我是什么导致了这个问题,以及如何使用集合来避免它吗?Set


答案 1

我可以重现您看到的行为,其中并行度与您指定的分叉连接池并行度的并行度不匹配。将分叉连接池并行度设置为 10 并将集合中的元素数增加到 50 后,我看到基于列表的流的并行度仅上升到 6,而基于集合的流的并行度永远不会超过 2。

但是,请注意,将任务提交到分叉连接池以在该池中运行并行流的这种技术是一种实现“技巧”,不能保证有效。实际上,用于执行并行流的线程或线程池是未指定的。默认情况下,使用公共分叉连接池,但在不同的环境中,可能最终使用不同的线程池。(考虑应用程序服务器中的容器。

java.util.stream.AbstractTask 类中,该字段确定完成的拆分量,这反过来又确定可以实现的并行度。此字段的值基于哪个字段当然使用公共池的并行性,而不是碰巧正在运行任务的任何池。LEAF_TARGETForkJoinPool.getCommonPoolParallelism()

可以说这是一个错误(参见OpenJDK问题JDK-8190974),但是,无论如何,这整个区域都是未指定的。但是,系统的这个领域肯定需要发展,例如在拆分策略,可用的并行度,处理阻塞任务等方面。JDK 的未来版本可能会解决其中的一些问题。

同时,可以通过使用系统属性来控制公共分叉连接池的并行度。如果将此行添加到程序中,

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

并且您在公共池中运行流(或者如果将它们提交到具有足够高的并行度集的自己的池中),您将观察到还有更多任务并行运行。

您还可以使用该选项在命令行上设置此属性。-D

同样,这不是保证的行为,将来可能会改变。但是在可预见的未来,这种技术可能适用于JDK 8的实现。

2019-06-12更新:错误 JDK-8190974 已在 JDK 10 中修复,并且该修复已向后移植到即将发布的 JDK 8u 版本 (8u222)。


答案 2

推荐