Java 8 流是否会导致未绑定数据上的 O(1) 内存减少成为 O(n) 内存,因为底层的 ForkJoin 实现
我已经编写了一个流实现,该实现在文件的行上执行四个简单的缩减(+和<)。
起初我执行了四个流,但我决定编写自己的累加器和组合器,以便我可以在一个流中执行所有四个简化。在小型数据集(10,000,000 行)上,这会将运行时间减少到预期的约 1/4,并在我的硬件上运行 14 秒。
fileIn = new BufferedReader(new InputStreamReader(
new URL(args[0].trim()).openStream()));
final Results results = fileIn.lines()
.parallel()
.skip(1)
.map(User::parse)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Results::new, Results::accumulate, Results::combine);
Results::accumulate
并正确地将“用户”分别组合到“结果”和“结果与结果”中,并且此实现非常适合小数据集。Results::combine
我也尝试过使用,结果相似,但我试图减少短期对象的创建。.reduce()
.collect()
问题是,当我使用10亿行的真实世界大小的数据时,我遇到了一个问题,表明Java 8流无法完成任务。在 JConsole 中观察到堆内存以大致线性的方式爬升到分配的 12 GB,然后达到 OOM。
我的印象是,收集器或化简器将提供与迭代解决方案相当的性能,迭代解决方案应受CPU和IO的限制,但不受内存的限制,因为缩减步骤会产生不会增长的结果,这是一种减少!
当我取一个堆转储并将其放入jhat时,我看到字符串占用了大约7GB,这些字符串可以清楚地看到是输入文件的行。我觉得它们根本不应该在内存中,但是jhat显示了一个非常大的ForkJoin相关结构,这些结构正在内存中积累:
Static reference from java.util.concurrent.ForkJoinPool.common (from class java.util.concurrent.ForkJoinPool) :
--> java.util.concurrent.ForkJoinPool@0x786d41db0 (76 bytes) (field workQueues:)
--> [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598 (144 bytes) (Element 3 of [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598:)
--> java.util.concurrent.ForkJoinPool$WorkQueue@0x786d41ee8 (96 bytes) (field currentSteal:)
--> java.util.stream.SliceOps$SliceTask@0x7b4ac6cb0 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b379ad18 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b25bdb68 (130 bytes) (field leftChild:)
--> java.util.stream.SliceOps$SliceTask@0x7b379acb8 (130 bytes) (field localResult:)
--> java.util.stream.Nodes$SpinedNodeBuilder@0x7b25fdda0 (53 bytes) (field spine:)
--> [[Ljava.lang.Object;@0x7b25ffe48 (144 bytes) (Element 12 of [[Ljava.lang.Object;@0x7b25ffe48:)
--> [Ljava.lang.Object;@0x7b37c4f20 (262160 bytes) (Element 19598 of [Ljava.lang.Object;@0x7b37c4f20:)
--> 31ea87ba876505645342b31928394b3c,2013-11-24T23:02:17+00:00,898,22200,1314,700 (28 bytes) (field value:)
--> [C@0x7b2ffff88 (170 bytes) // <<<< There are thousands of these
在 ApplicationShutdownHooks、Local References 和 System Classes 中还有其他引用,但我展示的这个是问题的症结所在,它会导致内存在以下情况下增长 O(n)
流实现是否通过保存 ForkJoin 类中的所有字符串来使此 O(1) 内存问题成为 O(n) 内存?我喜欢流媒体,我不希望这如此:(