Java 8 流是否会导致未绑定数据上的 O(1) 内存减少成为 O(n) 内存,因为底层的 ForkJoin 实现

2022-09-03 16:12:30

我已经编写了一个流实现,该实现在文件的行上执行四个简单的缩减(+和<)。

起初我执行了四个流,但我决定编写自己的累加器和组合器,以便我可以在一个流中执行所有四个简化。在小型数据集(10,000,000 行)上,这会将运行时间减少到预期的约 1/4,并在我的硬件上运行 14 秒。

fileIn = new BufferedReader(new InputStreamReader(
            new URL(args[0].trim()).openStream()));

final Results results = fileIn.lines()
        .parallel()
        .skip(1)
        .map(User::parse)
        .filter(Optional::isPresent)
        .map(Optional::get)
        .collect(Results::new, Results::accumulate, Results::combine);

Results::accumulate并正确地将“用户”分别组合到“结果”和“结果与结果”中,并且此实现非常适合小数据集。Results::combine

我也尝试过使用,结果相似,但我试图减少短期对象的创建。.reduce().collect()

问题是,当我使用10亿行的真实世界大小的数据时,我遇到了一个问题,表明Java 8流无法完成任务。在 JConsole 中观察到堆内存以大致线性的方式爬升到分配的 12 GB,然后达到 OOM。

我的印象是,收集器或化简器将提供与迭代解决方案相当的性能,迭代解决方案应受CPU和IO的限制,但不受内存的限制,因为缩减步骤会产生不会增长的结果,这是一种减少!

当我取一个堆转储并将其放入jhat时,我看到字符串占用了大约7GB,这些字符串可以清楚地看到是输入文件的行。我觉得它们根本不应该在内存中,但是jhat显示了一个非常大的ForkJoin相关结构,这些结构正在内存中积累:

Static reference from java.util.concurrent.ForkJoinPool.common (from class java.util.concurrent.ForkJoinPool) :

--> java.util.concurrent.ForkJoinPool@0x786d41db0 (76 bytes) (field workQueues:)
--> [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598 (144 bytes) (Element 3 of [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598:)
--> java.util.concurrent.ForkJoinPool$WorkQueue@0x786d41ee8 (96 bytes) (field currentSteal:)
--> java.util.stream.SliceOps$SliceTask@0x7b4ac6cb0 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b379ad18 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b25bdb68 (130 bytes) (field leftChild:)
--> java.util.stream.SliceOps$SliceTask@0x7b379acb8 (130 bytes) (field localResult:)
--> java.util.stream.Nodes$SpinedNodeBuilder@0x7b25fdda0 (53 bytes) (field spine:)
--> [[Ljava.lang.Object;@0x7b25ffe48 (144 bytes) (Element 12 of [[Ljava.lang.Object;@0x7b25ffe48:)
--> [Ljava.lang.Object;@0x7b37c4f20 (262160 bytes) (Element 19598 of [Ljava.lang.Object;@0x7b37c4f20:)
--> 31ea87ba876505645342b31928394b3c,2013-11-24T23:02:17+00:00,898,22200,1314,700 (28 bytes) (field value:)
--> [C@0x7b2ffff88 (170 bytes) // <<<< There are thousands of these

在 ApplicationShutdownHooks、Local References 和 System Classes 中还有其他引用,但我展示的这个是问题的症结所在,它会导致内存在以下情况下增长 O(n)

流实现是否通过保存 ForkJoin 类中的所有字符串来使此 O(1) 内存问题成为 O(n) 内存?我喜欢流媒体,我不希望这如此:(


答案 1

感谢Marko Topolnik和Holger得出正确答案。虽然两人都没有发布答案让我接受,所以我会试着把它绑起来,供后代:)

在并行流上非常昂贵,因为它需要排序才能完全跳过第一个条目,如 Stream.skip() 的 Javadoc 所示。.skip(1)

在调用 BufferedReader 之前读取它的第一行确实会成功跳过我的实现中的第一行。.lines()

然后删除 解决了内存问题,并且在JConsole中观察到即使程序处理了10亿行,也可以很好地反弹并在每个垃圾回收上返回到<1GB。这是我想要的行为,并且对于我的目的来说,它足够接近O(1)内存。.skip()

与上面的建议相反,相对位置和无关紧要,您无法对它们进行重新排序以使构建器模式“在”之前发生,该模式表明排序很重要,并且它适用于其他中间操作,但不适用于此操作。我记得我的OCP认证材料中的这种微妙之处,但它似乎不在Javadoc中,因此没有参考。然而,我通过实验证实了这一点,方法是进行孤立的更改并观察JConsole中的回归以及相关的OOM。.parallel().skip(1).skip(1).parallel().


答案 2