并行流的行为与流不同

2022-09-03 18:34:08

我很难理解为什么并行流和流对完全相同的语句给出不同的结果。

    List<String> list = Arrays.asList("1", "2", "3");
    String resultParallel = list.parallelStream().collect(StringBuilder::new,
            (response, element) -> response.append(" ").append(element),
            (response1, response2) -> response1.append(",").append(response2.toString()))
            .toString();
    System.out.println("ResultParallel: " + resultParallel);

    String result = list.stream().collect(StringBuilder::new,
            (response, element) -> response.append(" ").append(element),
            (response1, response2) -> response1.append(",").append(response2.toString()))
            .toString();

    System.out.println("Result: " + result);

结果平行: 1, 2, 3

结果: 1 2 3

有人可以解释为什么会发生这种情况,以及我如何让非并行版本给出与并行版本相同的结果吗?


答案 1

Java 8 Stream.collect 方法具有以下签名:

<R> R collect(Supplier<R> supplier,
              BiConsumer<R, ? super T> accumulator,
              BiConsumer<R, R> combiner);

其中 仅在并行流中调用(以便将部分结果合并到单个容器中),因此第一个代码段的输出为:BiConsumer<R, R> combiner

ResultParallel: 1, 2, 3

在版本中,不会被调用(请参阅此答案),因此忽略以下语句:sequentialcombiner

(response1, response2) -> response1.append(",").append(response2.toString())

结果是不同的:

1 2 3

如何解决?检查@Eugene的答案或此问题和答案


答案 2

要理解为什么会出错,请考虑javadoc中的这一点。

accumulator- 一个关联、非干扰、无状态的函数,必须将元素折叠到结果容器中。

combiner- 一个关联、非干扰、无状态的函数,它接受两个部分结果容器并合并它们,这必须与累加器函数兼容。合并器函数必须将元素从第二个结果容器折叠到第一个结果容器中。

这句话的意思是,元素是通过“积累”还是“结合”或两者的某种组合来收集的,这并不重要。但在代码中,累加器和合并器使用不同的分隔符进行连接。它们在javadoc所要求的意义上不是“兼容的”。

这会导致不一致的结果,具体取决于使用的是顺序流还是并行流。

  • 在并行情况被拆分为子流1,由不同的线程处理。这将导致每个子流都有一个单独的集合。然后合并这些集合。

  • 在顺序情况下,不会拆分流。相反,流只是简单地累积到单个集合中,并且不需要进行组合。


观察:

  • 通常,对于这种大小的流,执行简单的转换,可能会使事情变慢。parallelStream()

  • 在这种特定情况下,版本的瓶颈将是合并步骤。这是一个串行步骤,它执行的复制量与整个串行管道相同。所以,事实上,并行化肯定会让事情变慢。parallelStream()

  • 实际上,lambdas 的行为不正确。它们在开始时添加一个额外的空间,如果使用了,则加倍一些空格。更正确的版本是:combiner

    String result = list.stream().collect(
        StringBuilder::new,
        (b, e) -> b.append(b.isEmpty() ? "" : " ").append(e),
        (l, r) -> l.append(l.isEmpty() ? "" : " ").append(r)).toString();
    
  • 该类是连接流的一种更简单、更有效的方法。(图片来源:@Eugene)Joiner


1 - 在本例中,每个子流只有一个元素。对于较长的列表,您通常会获得与工作线程一样多的子流,并且子流将包含多个元素。


推荐