这个答案已经过时了 - 请阅读这个答案!
快速回答问题:观察到的行为是有意的!没有错误,一切都根据文档发生。但要说的是,这种行为应该被记录下来,并更好地传达。应该更清楚地说明如何忽略排序。forEach
我将首先介绍允许观察到的行为的概念。这为剖析问题中给出的一个例子提供了背景。我将在较高级别上执行此操作,然后在非常低的级别上再次执行此操作。
[TL;DR:单独阅读,高层次的解释会给出一个粗略的答案。
概念
与其讨论 s(即由流相关方法操作或返回的类型),不如讨论流操作和流管道。方法调用 ,并且是构建流流管道的流操作[1],并且 - 正如其他人所指出的 - 当终端操作被调用时,管道被作为一个整体处理[2]。Stream
lines
skip
parallel
forEach
管道可以被认为是一系列操作,这些操作一个接一个地在整个流上执行(例如,过滤所有元素,将剩余元素映射到数字,对所有数字求和)。但这是误导性的!一个更好的比喻是,终端操作通过每个操作拉动单个元素[3](例如,获取下一个未过滤的元素,映射它,将其添加到总和中,请求下一个元素)。一些中间操作可能需要遍历多个(例如)甚至所有(例如)元素,然后才能返回请求的下一个元素,这是操作中状态的来源之一。skip
sort
每个操作都通过以下 StreamOpFlag
发出其特征信号:
DISTINCT
SORTED
ORDERED
SIZED
SHORT_CIRCUIT
它们在流源、中间操作和终端操作中组合在一起,构成了管道(作为一个整体)的特征,然后用于优化[4]。同样,管道是否并行执行是整个管道的属性[5]。
因此,每当您对这些特征进行假设时,都必须仔细查看构建管道的所有操作,无论它们的应用顺序如何,以及它们做出的保证是什么。执行此操作时,请记住终端操作如何通过管道拉取每个单独的元素。
例
我们来看看这个特例:
BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
.skip(1L)
.parallel()
.forEach(System.out::println);
高级别
无论你的流源是否是有序的(它是),通过调用(而不是)你声明顺序对你来说无关紧要[6],这有效地从“跳过前n个元素”减少到“跳过任何n个元素”[7](因为没有顺序,前者变得毫无意义)。forEach
forEachOrdered
skip
因此,您赋予管道忽略顺序的权利,如果这有望加速。对于并行执行,它显然是这样认为的,这就是为什么你得到观察到的输出。因此,您观察到的是预期的行为,并且没有错误。
请注意,这与有状态不冲突!如上所述,有状态并不意味着它以某种方式缓存整个流(减去跳过的元素),并且随后的所有内容都在这些元素上执行。它只是意味着操作具有一些状态 - 即跳过的元素的数量(好吧,这实际上并不那么容易,但是由于我对正在发生的事情的有限理解,我会说这是一个公平的简化)。skip
低水平
让我们更详细地看一下:
-
BufferedReader.lines
创建 ,让我们称之为:Stream
_lines
-
.skip
创建了一个新的,让我们这样称呼它:Stream
_skip
- 调用
ReferencePipeline.skip
- 它使用
SliceOps.makeRef
构造一个“slice”操作(skip & limit的泛化)。
- 这将创建 一个匿名实例,该实例引用作为其源
ReferencePipeline.StatefulOp
_lines
-
.parallel
如上所述设置整个管道的并行标志
-
.forEach
实际开始执行
因此,让我们看看管道是如何执行的:
- 调用
_skip.forEach
会创建一个ForEachOp
(让我们称之为),并将其交给_skip.evaluate
,它做两件事:_forEach
- 调用
sourceSpliterator
以围绕此管道阶段的源创建拆分器:
- 调用
_forEach.evaluateParallel
,它创建一个 ForEachTask
(因为它是无序的;让我们调用它)并调用它_forEachTask
- 在
_forEachTask.compute
中,任务从前1024行中分离出来,为其创建一个新任务(我们称之为),实现没有剩余的行并完成。_forEachTask2
- 在分叉连接池中,
_forEachTask2.compute
被调用,徒劳地尝试再次拆分,最后通过调用 _skip.copyInto
开始将其元素复制到接收器(周围的流感知包装器)中。System.out.println
- 这实质上是将任务委托给指定的拆分器。这是上面创建的
_sliceSpliterator
!因此,_sliceSpliterator.forEachRemaining
负责将未跳过的元素传递给 println-sink:
- 它将一大块(在本例中为全部)行放入缓冲区并对其进行计数
- 它试图通过
acquirePermits
请求尽可能多的许可证(我假设是由于并行化)
- 源中有两个元素和一个要跳过的元素,它只获得一个许可证(一般来说,假设n)
- 它允许缓冲区将前n个元素(因此在本例中只有第一个元素)放入接收器中
因此,UnorderedSliceSpliterator.OfRef.forEachRemaining
是最终真正忽略顺序的地方。我没有将其与有序变体进行比较,但这是我的假设,为什么它以这种方式完成:
- 在并行化下,将分离器的元素铲入缓冲区可能会与其他执行相同任务交错
- 这将使跟踪他们的订单变得非常困难
- 这样做或防止交错会降低性能,如果顺序无关紧要,则毫无意义
- 如果订单丢失,除了处理前 n 个允许的元素之外,几乎别无他法
有问题吗?;)很抱歉持续了这么长时间。也许我应该省略细节,并写一篇博客文章.
来源
[1] java.util.stream
- 流操作和管道:
流操作分为中间操作和终端操作,并组合形成流管道。
[2] java.util.stream
- Stream operations and pipelines:
在执行管道的终端操作之前,不会开始遍历管道源。
[3] 这个比喻代表了我对流的理解。除了代码之外,主要来源是来自java.util.stream
的这段话 - 流操作和管道(突出显示我的):
懒惰地处理流可以显着提高效率;在管道(如上面的 filter-map-sum 示例)中,筛选、映射和求和可以融合到对数据的单个传递中,中间状态最少。懒惰还允许避免在没有必要时检查所有数据;对于诸如“查找长度超过 1000 个字符的第一个字符串”之类的操作,只需检查足够的字符串即可找到具有所需特征的字符串,而无需检查源中可用的所有字符串。
[4] java.util.stream.StreamOpFlag
:
在管道的每个阶段,可以计算出组合的流和操作标志[...jadda,jadda,jadda关于标志如何跨源,中间和终端操作组合...]以生成管道输出的标志。然后,这些标志可用于应用优化。
在代码中,您可以在 AbstractPipeline.combinedFlags
中看到这一点,它是在构造期间(以及其他一些实例)通过组合前一个操作和新操作的标志来设置的。
[5] java.util.stream
- 并行性(我无法直接链接到它 - 向下滚动一点):
启动终端操作时,流管道将按顺序或并行执行,具体取决于调用它的流的方向。
在代码中,您可以看到这是在 AbstractPipeline.sequential
、parallel
和 isParallel
中,它们在流源上设置/检查布尔标志,使得在构造流时调用 setter 时无关紧要。
[6] java.util.stream.Stream.forEach:
对此流的每个元素执行一个操作。[...]此操作的行为是显式非确定性的。
将此与java.util.stream.Stream.forEachOrdered进行对比:
如果流具有定义的遭遇顺序,则按流的遭遇顺序对此流的每个元素执行操作。
[7] 这也没有明确的记录,但我对Stream.skip
上的这个评论的解释(由我大大缩短):
[...]skip() [...] 在有序并行管道上可能非常昂贵 [...] 因为 skip(n) 被限制为不仅跳过任何 n 个元素,而且跳过遇到顺序中的前 n 个元素。[...][R]移动排序约束 [...] 可能会导致并行管道中 skip() 的显著加速