并行流、收集器和线程安全

请参阅下面的简单示例,该示例计算列表中每个单词的出现次数:

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

最后,是 。wordsCount{a=2, b=1, c=1}

但是我的流非常大,我想并行化工作,所以我写道:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

但是我注意到这很简单,所以我想知道我是否需要显式请求并发映射以确保线程安全:wordsCountHashMap

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));

非并发收集器是否可以安全地与并行流一起使用,或者从并行流收集时是否应仅使用并发版本?


答案 1

非并发收集器是否可以安全地与并行流一起使用,或者从并行流收集时是否应仅使用并发版本?

在并行流的操作中使用非并发收集器是安全的。collect

接口的规范中,在有六个项目符号的部分中,是这样的:Collector

对于非并发收集器,从结果提供程序、累加器或合路器函数返回的任何结果都必须串行线程限制。这使收集能够并行进行,而收集器无需实现任何其他同步。约简实现必须管理输入是否正确分区,分区单独处理,并且仅在累积完成后进行组合。

这意味着该类提供的各种实现可以与并行流一起使用,即使其中一些实现可能不是并发收集器。这也适用于您可能实现的任何您自己的非并发收集器。它们可以安全地与并行流一起使用,前提是您的收集器不会干扰流源,无副作用,与顺序无关等。Collectors

我还建议阅读 java.util.stream 包文档的可变减少部分。本节中间是一个示例,该示例声明是可并行化的,但它将结果收集到 一个 不是线程安全的 。ArrayList

其工作方式是,以非并发收集器结尾的并行流可确保不同的线程始终在中间结果集合的不同实例上运行。这就是为什么收集器有一个函数,用于创建与线程一样多的中间集合,因此每个线程都可以累积到自己的集合中。当要合并中间结果时,它们将在线程之间安全地传递,并且在任何给定时间,只有一个线程合并任何一对中间结果。Supplier


答案 2

如果所有收集器都遵循规范中的规则,则可以安全地并行或顺序运行。并行就绪性是此处设计的关键部分。

并发和非并发收集器之间的区别与并行化方法有关。

普通(非并发)收集器通过合并子结果来运行。因此,源被划分为一堆块,每个块被收集到一个结果容器(如列表或映射)中,然后将子结果合并到一个更大的结果容器中。这是安全和有序的,但对于某些类型的容器 ( 尤其是地图 - 可能很昂贵,因为按键合并两个地图通常很昂贵。

并发收集器改为创建一个结果容器,其插入操作保证是线程安全的,并从多个线程将元素推送到其中。对于像 ConcurrentHashMap 这样的高并发结果容器,这种方法可能比合并普通的 HashMaps 性能更好。

因此,并发收集器与普通收集器相比进行了严格的优化。而且它们不是没有代价的。由于元素是从许多线程中爆炸的,因此并发收集器通常无法保持遭遇顺序。(但是,通常你并不关心 - 在创建字数直方图时,你并不关心你首先计算了“foo”的哪个实例。


推荐