为什么flatMap()之后的filter()在Java流中“不完全”懒惰?

2022-08-31 12:46:42

我有以下示例代码:

System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);
System.out.println("-----------");
System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);

输出如下:

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1

从这里我看到在第一种情况下确实表现得很懒惰 - 我们使用,所以一旦我们有第一个元素,我们的过滤lambda就不会被调用。但是,在使用s的第二种情况下,我们看到,尽管找到了满足筛选条件的第一个元素(它只是任何第一个元素,因为lambda总是返回true),但流的进一步内容仍然通过过滤函数提供。streamfindFirst()flatMap

我试图理解为什么它表现得像这样,而不是像第一种情况那样计算第一个元素后放弃。任何有用的信息将不胜感激。


答案 1

TL;DR,这已在 JDK-8075939 中得到解决,并在 Java 10 中修复(并在 JDK-8225328 中向后移植到 Java 8)。

在查看实现()时,我们看到方法[链接ReferencePipeline.java]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

这将被调用以进行操作。需要注意的特殊事情是允许在第一场比赛中结束循环。与 [链接] 比较findFirstsink.cancellationRequested()]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    try (Stream<? extends R> result = mapper.apply(u)) {
                        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                        if (result != null)
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}

用于推进一个项目的方法最终会调用子流,而没有任何提前终止的可能性,并且该方法开头的注释甚至告诉了这个缺失的功能。forEachflatMap

由于这不仅仅是一个优化的事情,因为它意味着当子流是无限的时,代码只是中断,我希望开发人员尽快证明他们“可以做得比这更好”......


为了说明其含义,虽然按预期工作,但最终将处于无限循环中。Stream.iterate(0, i->i+1).findFirst()Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst()

关于规范,大部分可以在

包规范的“流操作和管道”一章

...

中间操作返回新流。他们总是懒惰;

...

...懒惰还允许避免在没有必要时检查所有数据;对于诸如“查找长度超过 1000 个字符的第一个字符串”之类的操作,只需检查足够的字符串即可找到具有所需特征的字符串,而无需检查源中可用的所有字符串。(当输入流是无限的而不仅仅是大时,这种行为变得更加重要。

...

此外,某些操作被视为短路操作。如果中间操作具有无限输入,则中间操作可能因此产生有限流,则该操作是短路的。如果终端操作具有无限输入,则终端操作可能在有限的时间内终止,则该操作是短路的。在管道中具有短路操作是无限流的处理在有限时间内正常终止的必要条件,但不是充分条件。

很明显,短路操作并不能保证有限时间终止,例如,当滤波器与任何项目不匹配时,处理无法完成,但是通过简单地忽略操作的短路性质,在有限时间内不支持任何终止的实现与规范相去甚远。


答案 2

输入流的元素被一个接一个地懒惰地使用。第一个元素 , 由两个 s 转换为流,因此整个流只对应于第一个输入元素。嵌套的流被管道急切地具体化,然后压平,然后送入舞台。这解释了您的输出。1flatMap-1, 0, 1, 0, 1, 2, 1, 2, 3filter

上述情况并非源于基本限制,但可能会使嵌套流完全懒惰的情况变得更加复杂。我怀疑要让它具有性能将是一个更大的挑战。

为了进行比较,Clojure的懒惰seqs为每个这样的嵌套级别获得了另一层包装。由于这种设计,当嵌套执行到极致时,操作甚至可能失败。StackOverflowError


推荐