并行平面地图始终按顺序排列

2022-09-03 13:49:59

假设我有这个代码:

 Collections.singletonList(10)
            .parallelStream() // .stream() - nothing changes
            .flatMap(x -> Stream.iterate(0, i -> i + 1)
                    .limit(x)
                    .parallel()
                    .peek(m -> {
                        System.out.println(Thread.currentThread().getName());
                    }))
            .collect(Collectors.toSet());

输出是相同的线程名称,因此这里没有任何好处 - 我的意思是有一个线程可以完成所有工作。parallel

里面有这个代码:flatMap

result.sequential().forEach(downstream);

我理解强制属性,如果“外部”流是平行的(它们可能会阻塞),“外部”将不得不等待“flatMap”完成,反之亦然(因为使用相同的公共池),但是为什么总是强制这样做?sequential

这是在更高版本中可能改变的事情之一吗?


答案 1

有两个不同的方面。

首先,只有一个管道是顺序的或并行的。在内流处选择顺序或并行是无关紧要的。请注意,您在引用的代码片段中看到的使用者表示整个后续流管道,因此在您的代码中,以 结尾,此使用者最终会将生成的元素添加到非线程安全的单个实例中。因此,与单个使用者并行处理内部流将破坏整个操作。downstream.collect(Collectors.toSet());Set

如果外部流被拆分,则引用的代码可能会同时被调用,不同的使用者添加到不同的集合。这些调用中的每一个都将处理映射到不同内部流实例的外部流的不同元素。由于外部流仅由单个元素组成,因此无法拆分。

这种已经实现的方式也是为什么flatMap()之后的filter()在Java流中“不完全”懒惰的原因?问题,就像在内部流上调用的那样,它将所有元素传递给下游消费者。正如这个答案所证明的那样,另一种支持懒惰和子流分裂的实现是可能的。但这是一种完全不同的实现方式。Stream 实现的当前设计主要由使用者组合工作,因此最终,源拆分器(以及从中分离出来的那些)接收表示 or 中的整个流管道的。相反,链接答案的解做分离器组合,产生一个新的委托给源分离器。我想,这两种方法都有优点,我不确定在以另一种方式工作时,OpenJDK实现会损失多少。forEachConsumertryAdvanceforEachRemainingSpliterator


答案 2

对于像我这样迫切需要并行化flatMap的人来说,他们需要一些实际的解决方案,而不仅仅是历史和理论。

我想出的最简单的解决方案是手工做扁平化,基本上是用.map + reduce(Stream::concat)

下面是一个示例来演示如何执行此操作:

@Test
void testParallelStream_NOT_WORKING() throws InterruptedException, ExecutionException {
    new ForkJoinPool(10).submit(() -> {
        Stream.iterate(0, i -> i + 1).limit(2)
                .parallel()

                // does not parallelize nested streams
                .flatMap(i -> generateRangeParallel(i, 100))

                .peek(i -> System.out.println(currentThread().getName() + " : generated value: i=" + i))
                .forEachOrdered(i -> System.out.println(currentThread().getName() + " : received value: i=" + i));
    }).get();
    System.out.println("done");
}

@Test
void testParallelStream_WORKING() throws InterruptedException, ExecutionException {
    new ForkJoinPool(10).submit(() -> {
        Stream.iterate(0, i -> i + 1).limit(2)
                .parallel()

                // concatenation of nested streams instead of flatMap, parallelizes ALL the items
                .map(i -> generateRangeParallel(i, 100))
                .reduce(Stream::concat).orElse(Stream.empty())

                .peek(i -> System.out.println(currentThread().getName() + " : generated value: i=" + i))
                .forEachOrdered(i -> System.out.println(currentThread().getName() + " : received value: i=" + i));
    }).get();
    System.out.println("done");
}

Stream<Integer> generateRangeParallel(int start, int num) {
    return Stream.iterate(start, i -> i + 1).limit(num).parallel();
}

// run this method with produced output to see how work was distributed
void countThreads(String strOut) {
    var res = Arrays.stream(strOut.split("\n"))
            .map(line -> line.split("\\s+"))
            .collect(Collectors.groupingBy(s -> s[0], Collectors.counting()));
    System.out.println(res);
    System.out.println("threads  : " + res.keySet().size());
    System.out.println("work     : " + res.values());
}

在我的计算机上运行的统计信息:

NOT_WORKING case stats:
{ForkJoinPool-1-worker-23=100, ForkJoinPool-1-worker-5=300}
threads  : 2
work     : [100, 300]

WORKING case stats:
{ForkJoinPool-1-worker-9=16, ForkJoinPool-1-worker-23=20, ForkJoinPool-1-worker-21=36, ForkJoinPool-1-worker-31=17, ForkJoinPool-1-worker-27=177, ForkJoinPool-1-worker-13=17, ForkJoinPool-1-worker-5=21, ForkJoinPool-1-worker-19=8, ForkJoinPool-1-worker-17=21, ForkJoinPool-1-worker-3=67}
threads  : 10
work     : [16, 20, 36, 17, 177, 17, 21, 8, 21, 67]

推荐