并行无限 Java 流耗尽内存

我试图理解为什么下面的Java程序给出了一个,而没有的相应程序没有。OutOfMemoryError.parallel()

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

我有两个问题:

  1. 该程序的预期输出是什么?

    如果没有,这似乎只是简单地输出,这意味着它只是“卡住”在flatMap中的第一个流,这是有道理的。.parallel()sum(1+2+3+...)

    对于并行,我不知道是否存在预期的行为,但我的猜测是,它以某种方式交错了第一个左右的流,并行工作者的数量在哪里。根据分块/缓冲行为,它也可能略有不同。nn

  2. 是什么原因导致内存不足?我特别试图了解这些流是如何在引擎盖下实现的。

    我猜有些东西阻塞了流,所以它永远不会完成,并且能够摆脱生成的值,但我不太清楚事情的评估顺序以及缓冲发生的位置。

编辑:如果相关,我正在使用Java 11。

编辑 2:显然,即使对于简单的程序,也会发生同样的事情,因此它可能与懒惰有关,而不是。IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()limitflatMap


答案 1

你说“但我不太清楚事情的评估顺序以及缓冲发生的位置”,这正是并行流的意义所在。评估顺序未指定。

您的示例的一个关键方面是 .这意味着实现不能只求和任意值,而必须求和前 100,000,000 个数字。请注意,在参考实现中,不会更改结果,这表示无序情况没有特殊实现,但这是实现细节。.limit(100_000_000).unordered().limit(100_000_000)

现在,当工作线程处理元素时,它们不能只是将它们汇总起来,因为它们必须知道允许它们使用哪些元素,这取决于在其特定工作负载之前有多少个元素。由于此流不知道大小,因此只有在处理了前缀元素时才知道这一点,对于无限流,这种情况永远不会发生。因此,工作线程暂时保持缓冲,此信息变得可用。

原则上,当工作线程知道它处理最左边¹的工作块时,它可以立即汇总元素,对它们进行计数,并在达到极限时发出结束信号。因此,Stream可能会终止,但这取决于很多因素。

在你的例子中,一个合理的场景是,其他工作线程在分配缓冲区方面比最左边的作业计数更快。在这种情况下,对计时的细微更改可能会使流偶尔返回一个值。

当我们减慢除处理最左侧块的工作线程之外的所有工作线程时,我们可以使流终止(至少在大多数运行中):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ 我正在遵循斯图尔特·马克斯(Stuart Marks)的建议,在谈论遭遇顺序而不是处理顺序时,使用从左到右的顺序。


答案 2

我最好的猜测是,添加会改变内部行为,其内部行为之前已经懒惰地评估了问题parallel()flatMap()

[JDK-8202307] 获取 java.lang.OutOfMemoryError:在 flatMap 中使用无限/非常大的流的流上调用 Stream.iterator().next() 时,您遇到的错误已报告。如果您查看票证,它或多或少与您获得的堆栈跟踪相同。票证已作为“不会修复”关闭,原因如下:OutOfMemoryError

和 方法是“逃生舱口”,用于无法使用其他操作时使用。它们有一些限制,因为它们将流实现的推送模型转换为拉模型。在某些情况下,这种转换需要缓冲,例如当一个元素被(平面)映射到两个或多个元素时。这将使流实现变得非常复杂,可能是以牺牲常见情况为代价的,以支持背压的概念来传达要通过元素生产的嵌套层提取多少元素。iterator()spliterator()