这是 Files.lines() 中的一个错误,还是我对并行流有误解?

环境:Ubuntu x86_64 (14.10), Oracle JDK 1.8u25

我尝试使用Files.lines()的并行流,但我想在第一行(它是一个带有标头的CSV文件)中.skip()。因此,我尝试这样做:

try (
    final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
        .skip(1L).parallel();
) {
    // etc
}

但是,一列未能解析为整数...

所以我尝试了一些简单的代码。文件是问题非常简单:

$ cat info.csv 
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$

代码同样简单:

public static void main(final String... args)
{
    final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
    Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
        .forEach(System.out::println);
}

系统地得到了以下结果(好吧,我只运行了大约20次):

startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes

我在这里错过了什么?


编辑似乎问题或误解比这更根深蒂固(下面的两个例子是由FreeNode的##java上的一个人编造的):

public static void main(final String... args)
{
    new BufferedReader(new StringReader("Hello\nWorld")).lines()
        .skip(1L).parallel()
        .forEach(System.out::println);

    final Iterator<String> iter
        = Arrays.asList("Hello", "World").iterator();
    final Spliterator<String> spliterator
        = Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED);
    final Stream<String> s
        = StreamSupport.stream(spliterator, true);

    s.skip(1L).forEach(System.out::println);
}

这打印:

Hello
Hello

呃。

@Holger建议,对于与此其他示例无关的任何流,都会发生这种情况:ORDEREDSIZED

Stream.of("Hello", "World")
    .filter(x -> true)
    .parallel()
    .skip(1L)
    .forEach(System.out::println);

此外,它源于已经发生的所有讨论,即问题(如果它是一个?)是(正如@SotiriosDelimanolis首先指出的那样)。.forEach()


答案 1

由于该问题的当前状态与此处的早期陈述完全相反,因此应该注意的是,Brian Goetz现在明确声明了无序特征的反向传播,而操作被认为是一个错误。还有人说,它现在被认为根本没有终端操作的有序性的反向传播。skip

还有一个相关的错误报告,JDK-8129120,其状态为“在Java 9中修复”,并且它向后移植到Java 8,更新60

我做了一些测试,似乎现在的实现确实表现出更直观的行为。jdk1.8.0_60


答案 2

这个答案已经过时了 - 请阅读这个答案!


快速回答问题:观察到的行为是有意的!没有错误,一切都根据文档发生。但要说的是,这种行为应该被记录下来,并更好地传达。应该更清楚地说明如何忽略排序。forEach

我将首先介绍允许观察到的行为的概念。这为剖析问题中给出的一个例子提供了背景。我将在较高级别上执行此操作,然后在非常低的级别上再次执行此操作。

[TL;DR:单独阅读,高层次的解释会给出一个粗略的答案。

概念

与其讨论 s(即由流相关方法操作或返回的类型),不如讨论流操作流管道。方法调用 ,并且是构建流流管道的流操作[1],并且 - 正如其他人所指出的 - 当终端操作被调用时,管道被作为一个整体处理[2]。StreamlinesskipparallelforEach

管道可以被认为是一系列操作,这些操作一个接一个地在整个流上执行(例如,过滤所有元素,将剩余元素映射到数字,对所有数字求和)。但这是误导性的!一个更好的比喻是,终端操作通过每个操作拉动单个元素[3](例如,获取下一个未过滤的元素,映射它,将其添加到总和中,请求下一个元素)。一些中间操作可能需要遍历多个(例如)甚至所有(例如)元素,然后才能返回请求的下一个元素,这是操作中状态的来源之一。skipsort

每个操作都通过以下 StreamOpFlag发出其特征信号:

  • DISTINCT
  • SORTED
  • ORDERED
  • SIZED
  • SHORT_CIRCUIT

它们在流源、中间操作和终端操作中组合在一起,构成了管道(作为一个整体)的特征,然后用于优化[4]。同样,管道是否并行执行是整个管道的属性[5]。

因此,每当您对这些特征进行假设时,都必须仔细查看构建管道的所有操作,无论它们的应用顺序如何,以及它们做出的保证是什么。执行此操作时,请记住终端操作如何通过管道拉取每个单独的元素。

我们来看看这个特例:

BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
        .skip(1L)
        .parallel()
        .forEach(System.out::println);

高级别

无论你的流源是否是有序的(它是),通过调用(而不是)你声明顺序对你来说无关紧要[6],这有效地从“跳过前n个元素”减少到“跳过任何n个元素”[7](因为没有顺序,前者变得毫无意义)。forEachforEachOrderedskip

因此,您赋予管道忽略顺序的权利,如果这有望加速。对于并行执行,它显然是这样认为的,这就是为什么你得到观察到的输出。因此,您观察到的是预期的行为,并且没有错误。

请注意,这与有状态不冲突!如上所述,有状态并不意味着它以某种方式缓存整个流(减去跳过的元素),并且随后的所有内容都在这些元素上执行。它只是意味着操作具有一些状态 - 即跳过的元素的数量(好吧,这实际上并不那么容易,但是由于我对正在发生的事情的有限理解,我会说这是一个公平的简化)。skip

低水平

让我们更详细地看一下:

  1. BufferedReader.lines创建 ,让我们称之为:Stream_lines
  2. .skip创建了一个新的,让我们这样称呼它:Stream_skip
    • 调用ReferencePipeline.skip
    • 它使用SliceOps.makeRef构造一个“slice”操作(skip & limit的泛化)。
    • 这将创建 一个匿名实例,该实例引用作为其源ReferencePipeline.StatefulOp_lines
  3. .parallel如上所述设置整个管道的并行标志
  4. .forEach实际开始执行

因此,让我们看看管道是如何执行的:

  1. 调用_skip.forEach会创建一个ForEachOp(让我们称之为),并将其交给_skip.evaluate,它做两件事:_forEach
    1. 调用 sourceSpliterator 以围绕此管道阶段的源创建拆分器:
    2. 调用_forEach.evaluateParallel,它创建一个 ForEachTask(因为它是无序的;让我们调用它)并调用它_forEachTask
  2. _forEachTask.compute中,任务从前1024行中分离出来,为其创建一个新任务(我们称之为),实现没有剩余的行并完成。_forEachTask2
  3. 在分叉连接池中,_forEachTask2.compute 被调用,徒劳地尝试再次拆分,最后通过调用 _skip.copyInto 开始将其元素复制到接收器(周围的流感知包装器)中。System.out.println
  4. 这实质上是将任务委托给指定的拆分器。这是上面创建的_sliceSpliterator因此,_sliceSpliterator.forEachRemaining 负责将未跳过的元素传递给 println-sink:
    1. 它将一大块(在本例中为全部)行放入缓冲区并对其进行计数
    2. 它试图通过 acquirePermits 请求尽可能多的许可证(我假设是由于并行化)
    3. 源中有两个元素和一个要跳过的元素,它只获得一个许可证(一般来说,假设n)
    4. 它允许缓冲区将前n个元素(因此在本例中只有第一个元素)放入接收器中

因此,UnorderedSliceSpliterator.OfRef.forEachRemaining是最终真正忽略顺序的地方。我没有将其与有序变体进行比较,但这是我的假设,为什么它以这种方式完成:

  • 在并行化下,将分离器的元素铲入缓冲区可能会与其他执行相同任务交错
  • 这将使跟踪他们的订单变得非常困难
  • 这样做或防止交错会降低性能,如果顺序无关紧要,则毫无意义
  • 如果订单丢失,除了处理前 n 个允许的元素之外,几乎别无他法

有问题吗?;)很抱歉持续了这么长时间。也许我应该省略细节,并写一篇博客文章.

来源

[1] java.util.stream - 流操作和管道

流操作分为中间操作和终端操作,并组合形成流管道

[2] java.util.stream - Stream operations and pipelines

在执行管道的终端操作之前,不会开始遍历管道源。

[3] 这个比喻代表了我对流的理解。除了代码之外,主要来源是来自java.util.stream的这段话 - 流操作和管道(突出显示我的):

懒惰地处理流可以显着提高效率;在管道(如上面的 filter-map-sum 示例)中,筛选、映射和求和可以融合到对数据的单个传递中,中间状态最少。懒惰还允许避免在没有必要时检查所有数据;对于诸如“查找长度超过 1000 个字符的第一个字符串”之类的操作,只需检查足够的字符串即可找到具有所需特征的字符串,而无需检查源中可用的所有字符串。

[4] java.util.stream.StreamOpFlag

在管道的每个阶段,可以计算出组合的流和操作标志[...jadda,jadda,jadda关于标志如何跨源,中间和终端操作组合...]以生成管道输出的标志。然后,这些标志可用于应用优化。

在代码中,您可以在 AbstractPipeline.combinedFlags 中看到这一点,它是在构造期间(以及其他一些实例)通过组合前一个操作和新操作的标志来设置的。

[5] java.util.stream - 并行性(我无法直接链接到它 - 向下滚动一点):

启动终端操作时,流管道将按顺序或并行执行,具体取决于调用它的流的方向。

在代码中,您可以看到这是在 AbstractPipeline.sequentialparallelisParallel 中,它们在流源上设置/检查布尔标志,使得在构造流时调用 setter 时无关紧要。

[6] java.util.stream.Stream.forEach

对此流的每个元素执行一个操作。[...]此操作的行为是显式非确定性的。

将此与java.util.stream.Stream.forEachOrdered进行对比:

如果流具有定义的遭遇顺序,则按流的遭遇顺序对此流的每个元素执行操作。

[7] 这也没有明确的记录,但我对Stream.skip上的这个评论的解释(由我大大缩短):

[...]skip() [...] 在有序并行管道上可能非常昂贵 [...] 因为 skip(n) 被限制为不仅跳过任何 n 个元素,而且跳过遇到顺序中的前 n 个元素。[...][R]移动排序约束 [...] 可能会导致并行管道中 skip() 的显著加速


推荐