收集器的合路器函数是否可以用于顺序流?

2022-09-01 06:10:16

示例程序:

public final class CollectorTest
{
    private CollectorTest()
    {
    }

    private static <T> BinaryOperator<T> nope()
    {
        return (t, u) -> { throw new UnsupportedOperationException("nope"); };
    }

    public static void main(final String... args)
    {
        final Collector<Integer, ?, List<Integer>> c
            = Collector.of(ArrayList::new, List::add, nope());

        IntStream.range(0, 10_000_000).boxed().collect(c);
    }
}

因此,为了简化这里的事情,没有最终的转换,因此生成的代码非常简单。

现在,生成顺序流。我只是简单地将结果框入s,然后我精心策划将它们收集到一个.非常简单。IntStream.range()IntegerCollectorList<Integer>

无论我运行此示例程序多少次,永远不会命中,这意味着我的虚拟组合器永远不会被调用。UnsupportedOperationException

我有点期待这一点,但后来我已经误解了流,以至于我不得不问这个问题......

当流保证是顺序的时,是否可以调用 的合并器?Collector


答案 1

仔细阅读 ReduceOps 中的流实现代码.java,可以看出,组合函数仅在完成时才调用,并且仅在并行评估管道时使用实例。因此,在当前实现中,在评估顺序管道时从不调用合并器。ReduceTaskReduceTask

但是,规范中没有任何内容可以保证这一点。A 是对其实现提出要求的接口,并且没有为顺序流授予任何豁免。就个人而言,我发现很难想象为什么顺序管道评估可能需要调用组合器,但是比我更有想象力的人可能会找到它的聪明用途,并实现它。规范允许这样做,即使今天的实现没有这样做,你仍然需要考虑它。Collector

这并不奇怪。流 API 的设计中心是支持与顺序执行同等地位的并行执行。当然,程序可以观察它是按顺序执行还是并行执行。但是API的设计是支持一种允许两者的编程风格。

如果您正在编写收集器,并且发现编写关联组合器函数是不可能的(或不方便的或困难的),从而导致您想要将流限制为顺序执行,则可能意味着您正朝着错误的方向前进。现在是时候退后一步,考虑以不同的方式解决问题了。

不需要关联合并器函数的常见缩减式操作称为左折叠。主要特征是折叠函数严格地从左到右应用,一次进行一个。我不知道有什么方法可以并行化向左折叠。

当人们试图像我们一直在谈论的那样扭曲收藏家时,他们通常在寻找像左折这样的东西。Streams API 没有对此操作的直接 API 支持,但它很容易编写。例如,假设您要使用此操作来减少字符串列表:重复第一个字符串,然后追加第二个字符串。很容易证明此操作不是关联的:

List<String> list = Arrays.asList("a", "b", "c", "d", "e");

System.out.println(list.stream()
    .collect(StringBuilder::new,
             (a, b) -> a.append(a.toString()).append(b),
             (a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE

按顺序运行,这将生成所需的输出:

aabaabcaabaabcdaabaabcaabaabcde

但是,当并行运行时,它可能会产生如下结果:

aabaabccdde

由于它按顺序“工作”,因此我们可以通过调用来强制执行,并通过让合并器引发异常来备份它。此外,供应商必须只叫一次。没有办法合并中间结果,所以如果供应商被叫了两次,我们已经有麻烦了。但是,由于我们“知道”供应商在顺序模式下只被调用一次,因此大多数人并不担心这一点。事实上,我见过人们写“供应商”,返回一些现有的对象,而不是创建一个新的对象,这违反了供应商合同。sequential()

在 3-arg 形式的使用中,我们有三个函数中的两个打破了它们的合约。这难道不应该告诉我们以不同的方式做事吗?collect()

这里的主要工作是由累加器功能完成的。为了实现折叠式约简,我们可以使用 以严格的从左到右的顺序应用此函数。我们必须在之前和之后进行一些设置和完成代码,但这没有问题:forEachOrdered()

StringBuilder a = new StringBuilder();
list.parallelStream()
    .forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());

当然,这在并行中工作正常,尽管并行运行的性能优势可能会被 的排序要求所否定。forEachOrdered()

总而言之,如果您发现自己想要执行可变约简,但缺少关联合路器函数,从而导致您将流限制为顺序执行,请将问题重铸为向左折叠操作,并在累加器函数上使用。forEachRemaining()


答案 2

正如在前面的@MarkoTopolnik和@Duncan注释中观察到的那样,不能保证在顺序模式下调用以产生减少的结果。事实上,Java文档在这一点上有点主观,这可能会导致不适当的解释。Collector.combiner()

(...)并行实现将对输入进行分区,为每个分区创建一个结果容器,将每个分区的内容累积到该分区的子结果中,然后使用组合器函数将子结果合并为组合结果

根据NoBlogDefFound,组合器仅在并行模式下使用。请参阅下面的部分报价:

combiner() 用于将两个累加器连接成一个累加器。当并行执行收集器,拆分输入流并首先独立收集部件时,使用它。

为了更清楚地说明这个问题,我重写了第一个代码,并提出了两种方法(串行和并行)。


public final class CollectorTest
{
    private CollectorTest()
    {
    }

    private static <T> BinaryOperator<T> nope()
    {
        return (t, u) -> { throw new UnsupportedOperationException("nope"); };
    }

    public static void main(final String... args)
    {

        final Collector<Integer, ?, List<Integer>> c =
                Collector
                    .of(ArrayList::new, List::add, nope());

        // approach sequential
        Stream<Integer> sequential = IntStream
                .range(0, 10_000_000)
                .boxed();

        System.out.println("isParallel:" + sequential.isParallel());
        sequential
                .collect(c);

        // approach parallel
        Stream<Integer> parallel = IntStream
                .range(0, 10_000_000)
                .parallel()
                .boxed();

        System.out.println("isParallel:" + parallel.isParallel());
        parallel
                .collect(c);
    }
}

运行此代码后,我们可以获得输出:

isParallel:false
isParallel:true
Exception in thread "main" java.lang.UnsupportedOperationException: nope
    at com.stackoverflow.lambda.CollectorTest.lambda$nope$0(CollectorTest.java:18)
    at com.stackoverflow.lambda.CollectorTest$$Lambda$3/2001049719.apply(Unknown Source)
    at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:174)
    at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:160)

因此,根据这个结果,我们可以推断出只能由并行执行调用。Collector's combiner


推荐