Java并行流:如何等待线程完成并行流?

因此,我有一个列表,从中获取并行流以填写地图,如下所示:

Map<Integer, TreeNode> map = new HashMap<>();
List<NodeData> list = some_filled_list;

//Putting data from the list into the map
list.parallelStream().forEach(d -> {
                TreeNode node = new TreeNode(d);
                map.put(node.getId(), node);
            });

//print out map
map.entrySet().stream().forEach(entry -> {
     System.out.println("Processing node with ID = " + entry.getValue().getId());
                });

此代码的问题在于,当“放置数据”过程仍在进行时,地图正在打印出来(cuz它是并行的),因此,map尚未收到列表中的所有元素。当然,在我的真实代码中,它不仅仅是打印出地图;我使用地图来利用O(1)查找时间。

我的问题是:

  1. 如何让主线程等待,以便在地图打印出来之前完成“放置数据”?我试图将“放置数据”放在线程t中,并执行和,但这无济于事。t.start()t.join()

  2. 也许在这种情况下我不应该使用并行流?这个列表很长,我只想利用并行性来提高效率。


答案 1

这样,您就违反了 Stream 文档中明确声明的属性。list.parallelStream().forEachside-effects

另外,当你说这个代码是当“放置数据”过程仍在进行时,地图正在打印出来(cuz它是并行的),这不是真的,就像终端操作一样,它将等待完成,直到它可以进入下一行的进程。您可能会看到这种情况,因为您正在收集到非线程安全状态,并且某些条目可能不在该映射中...想想其他方式,如果您将来自多个线程的多个条目放入?好吧,很多东西可能会中断,例如缺少条目,不正确/不一致的Map等。forEachHashMapHashMap

当然,将其更改为 a 会起作用,因为它是线程安全的,但是您仍然违反了副作用属性,尽管是以“安全”的方式。ConcurrentHashMap

正确的做法是直接去一个没有:collectMapforEach

Map<Integer, TreeNode> map = list.parallelStream()
        .collect(Collectors.toMap(
                NodeData::getId,
                TreeNode::new
        ));

这样,即使对于并行处理,一切都会很好。请注意,您需要大量(数以万计的元素)才能从并行处理中获得任何可衡量的性能提升。


答案 2

流操作将阻塞,直到完成并行实现和非并行实现。

所以你看到的不是 - 很可能它只是数据损坏,因为不是线程安全的。请尝试改用。the "putting data" process is still going onHashMapConcurrentHashMap