并行流在不同的操作下是否正常工作?

2022-09-02 13:29:18

我正在阅读有关无国籍状态的文章,并在文档中遇到了这个问题:

如果流操作的行为参数是有状态的,则流管道结果可能是不确定的或不正确的。有状态 lambda(或实现相应功能接口的其他对象)是其结果取决于在执行流管道期间可能更改的任何状态的 lambda。

现在,如果我有一个字符串列表(比如说),然后尝试按以下方式使用并行流从中删除重复的字符串:strList

List<String> resultOne = strList.parallelStream().distinct().collect(Collectors.toList());

或者如果我们想要不区分大小写:

List<String> result2 = strList.parallelStream().map(String::toLowerCase)
                       .distinct().collect(Collectors.toList());

此代码是否存在任何问题,因为并行流会拆分输入,并且在一个块中不同并不一定意味着在整个输入中不同?

编辑(以下答案的快速摘要)

这是一个有状态操作,在有状态中间操作的情况下,并行流可能需要多次传递或大量缓冲开销。如果元素排序不相关,也可以更有效地实现。另外根据文档distinctdistinct

对于有序流,不同元素的选择是稳定的(对于重复的元素,将保留遇到顺序中首先出现的元素。对于无序流,不做稳定性保证。

但是,在并行运行的有序流的情况下,distinct可能是不稳定的 - 这意味着它将在重复的情况下保留任意元素,而不一定是其他方面预期的第一个元素。distinct

链接

在内部,distinct() 操作保留了一个包含以前见过的元素的 Set,但它隐藏在操作中,我们无法从应用程序代码中获取它。

因此,在并行流的情况下,它可能会消耗整个流,或者可能使用CHM(类似)。对于有序的,最有可能的是使用或类似的结构。ConcurrentHashMap.newKeySet()LinkedHashSet


答案 1

粗略地指出文档的相关部分(强调,我的):

中间操作进一步分为无状态操作和有状态操作。无状态操作(如过滤器和映射)在处理新元素时不会保留以前看到的元素的状态 - 每个元素都可以独立于其他元素的操作进行处理。有状态操作(如 distinct 和 sorted)在处理新元素时可能会合并以前见过的元素的状态

有状态操作可能需要在生成结果之前处理整个输入。例如,在看到流的所有元素之前,无法通过对流进行排序产生任何结果。因此,在并行计算下,某些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓冲重要数据。包含独占无状态中间操作的管道可以在单次传递中处理,无论是顺序的还是并行的,只需最少的数据缓冲

如果您进一步阅读(关于订购的部分):

流可能具有定义的遭遇顺序,也可能没有。流是否具有遭遇顺序取决于源和中间操作。某些流源(如 List 或数组)本质上是有序的,而其他流源(如 HashSet)则不是。某些中间操作(如 sorted())可能会对其他无序流施加遇到顺序,而其他操作可能会使有序流无序呈现,例如 BaseStream.unordered()。此外,某些终端操作可能会忽略遇到顺序,例如 forEach()。

...

对于并行流,放宽排序约束有时可以实现更有效的执行。如果元素排序不相关,则可以更有效地实现某些聚合操作,例如筛选重复项 (distinct()) 或分组缩减 (Collectors.groupingBy())。类似地,与遭遇顺序有内在联系的操作(如 limit())可能需要缓冲来确保正确的排序,从而破坏了并行性的好处。如果流具有遭遇顺序,但用户并不特别关心该遭遇顺序,则使用 unordered() 显式取消对流的排序可能会提高某些有状态或终端操作的并行性能。但是,大多数流管道(如上面的“块的权重总和”示例)即使在排序约束下也能有效地并行化。

总之,

  • distinct将适用于并行流,但您可能已经知道,它必须在继续之前消耗整个流,这可能会占用大量内存。
  • 如果项的源是无序集合(如 hashset)或流是 ,则不担心对输出进行排序,因此效率很高unordered()distinct

解决方案是添加到流管道,如果您不担心顺序并希望看到更多的性能。.unordered()

List<String> result2 = strList.parallelStream()
                              .unordered()
                              .map(String::toLowerCase)
                              .distinct()
                              .collect(Collectors.toList());

唉,Java中没有(可用的内置)并发哈希集(除非他们在ConcurrentHashMap上变得聪明),所以我只能给你留下一个不幸的可能性,即使用常规Java集以阻塞方式实现不同。在这种情况下,我看不到做任何并行区分的好处。


编辑:我说得太早了。使用具有 distinct 的并行流可能会有一些好处。它看起来比我最初想象的更聪明。看看@Eugene的答案distinct


答案 2

您似乎错过了您提供的文档和实际示例中的许多内容。

如果流操作的行为参数是有状态的,则流管道结果可能是不确定的或不正确的。

在您的示例中,您没有定义任何有状态操作。文档中的 Stateful 是指您定义的那些,而不是那些自己实现的那些 - 就像在你的示例中一样。但无论哪种方式,你都可以定义一个正确的有状态操作,即使是Stuart Marks-在Oracle/Java工作,也提供了这样一个例子jdkdistinct

因此,您在提供的示例中非常合适,无论它是否并行。

(并行)的昂贵部分来自这样一个事实,即内部必须有一个线程安全的数据结构来保留不同的元素;在jdk情况下,它是在顺序无关紧要的情况下使用的,或者在顺序重要时使用a。distinctConcurrentHashMapLinkedHashSet

distinct顺便说一句,这是一个非常聪明的实现,它看你的流源是否已经是不同的(在这种情况下,它是一个no-op),或者看看你的数据是否被排序,在这种情况下,它会对源进行更智能的遍历(因为它知道,如果你看到了一个元素,下一个即将到来的是你刚刚看到的相同或不同的元素), 或在内部使用,等等。ConcurrentHashMap


推荐