如何懒惰地评估嵌套平面图

我试图从两个潜在的无限流中联想到一个笛卡尔积,然后我通过.limit()

到目前为止,这(大约)是我的策略:

@Test
void flatMapIsLazy() {
        Stream.of("a", "b", "c")
            .flatMap(s -> Stream.of("x", "y")
                .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                    .mapToObj(sd::repeat)))
            .map(s -> s + "u")
            .limit(20)
            .forEach(System.out::println);
}

这不起作用。

显然,我的第二个流在第一次在管道上使用时,就会在现场进行终端评估。它不会产生一个懒惰的流,然后我可以按照自己的节奏消费。

我认为在这段代码中来自是罪魁祸首:.forEachReferencePipeline#flatMap

@Override
public void accept(P_OUT u) {
    try (Stream<? extends R> result = mapper.apply(u)) {
        if (result != null) {
            if (!cancellationRequestedCalled) {
               result.sequential().forEach(downstream);
            }
            else {
                var s = result.sequential().spliterator();
                do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
            }
        }
    }
}

我期望上面的代码返回20个元素,如下所示:

a
ax
axx
axxx
axxxx
...
axxxxxxxxxxxxxxxxxxx

但是相反,它崩溃了,因为嵌套的很长被急切地计算(??),并用重复字符串的不必要副本填充了我的记忆。如果提供的值为 3,并将相同的限制保持在 20,则预期输出将为:OutOfMemoryErrorStreamflatMapInteger.MAX_VALUE

a
ax
axx
axxx
a
ay
ayy
ayyy
b
bx
bxx
bxxx
...
(up until 20 lines)

编辑:在这一点上,我刚刚用懒惰的迭代器滚动了自己的实现。不过,我认为应该有一种方法可以用纯Streams做到这一点。

编辑2:这已被承认为Java中的错误票证 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8267758%20


答案 1

正如您已经写过的,这已被接受为错误。也许,它将在Java的未来版本中得到解决。

但即使现在也有解决方案。它不是很优雅,只有当外流中的元素数量和极限足够小时,它才可行。但它将在这些限制下工作。

让我首先修改一下您的示例,将外部转换为两个操作(a和a具有标识,仅执行扁平化):flatMapmapflatMap

Stream.of("a", "b", "c")
      .map(s -> Stream.of("x", "y")
            .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                  .mapToObj(sd::repeat)))
      .flatMap(s -> s)
      .map(s -> s + "u")
      .limit(20)
      .forEach(System.out::println);

我们可以很容易地看到,我们只需要来自每个内部流的不超过20个元素。因此,我们可以将每个流限制为此数量的元素。这将起作用(您应该使用可变量或常量作为限制):

Stream.of("a", "b", "c")
      .map(s -> Stream.of("x", "y")
            .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                  .mapToObj(sd::repeat)))
      .flatMap(s -> s.limit(20))            // limit each inner stream
      .map(s -> s + "u")
      .limit(20)
      .forEach(System.out::println);

当然,这仍然会产生太多的中间结果,但在上述限制下可能不是一个大问题。


答案 2