Stream reduce() 要求究竟需要什么?

在并行流上使用该操作时,OCP考试手册指出,论点必须遵守某些原则。这些原则如下:reduce()reduce()

  1. 必须定义标识,以便对于流 u 中的所有元素,combiner.apply(标识,u) 等于你。
  2. 累加器运算符 op 必须是关联且无状态的,因此等于 。(a op b) op ca op (b op c)
  3. 组合器运算符还必须是关联和无状态的,并且与标识兼容,以便对于 all of 和 等于 。utcombiner.apply(u, accumulator.apply(identity, t))accumulator.apply(u,t)

本书给出了两个例子来说明这些原则,请参阅下面的代码:

关联示例:

System.out.println(
        Arrays.asList(1, 2, 3, 4, 5, 6)
                .parallelStream()
                .reduce(0, (a, b) -> (a - b)));

书中对此有何描述:

它可能会输出 -21、3 或某些其他值,因为累加器函数违反了关联性属性。

身份要求的示例:

System.out.println(
        Arrays.asList("w", "o", "l", "f")
                .parallelStream()
                .reduce("X", String::concat));

书中对此有何描述:

如果我们使用不是真正标识值的标识参数,则可以看到其他问题。它可以输出.作为并行过程的一部分,标识应用于流中的多个元素,从而产生非常意外的数据。XwXoXlXf

我不明白这些例子。以累加器为例,累加器从哪个开始,然后是 哪个是 ,然后依此类推一直到 。我理解,因为生成的数组列表不同步,结果可能是不可预测的,因为可能存在争用条件等,但为什么累加器不是关联的呢?也不会造成不可预测的结果吗?我真的看不出示例中使用的累加器有什么问题,以及为什么它不是结合的,但话说回来,我仍然不完全理解“结合原理”是什么意思。0 - 1-1-1 - 2-3-6-21(a+b)

我也不明白身份的例子。我知道结果可能是如果4个单独的线程同时开始与标识一起累积,但这与标识参数本身有什么关系?那么,究竟使用什么才是合适的身份呢?XwXoXlXf

我想知道是否有人能就这些原则对我有更多的启发。

谢谢


答案 1

为什么累加器不是关联式的?

它不是关联的,因为减法运算的顺序决定了最终结果。

如果运行串行 ,您将获得以下预期结果:Stream

0 - 1 - 2 - 3 - 4 - 5 - 6 = -21

另一方面,对于并行 s,工作被拆分为多个线程。例如,如果在 6 个线程上并行执行,然后合并中间结果,则可以得到不同的结果:Streamreduce

0 - 1   0 - 2   0 - 3      0 - 4     0 - 5    0 - 6
  -1     -2      -3         -4        -5        -6

  -1 - (-2)         -3 - (-4)          -5 - (-6)
      1                 1                  1
           1   -   1
               0            -     1

                        -1

或者,缩短一个长示例:

(1 - 2) - 3 = -4
1 - (2 - 3) =  2

因此,减法不是结合的。

另一方面,不会导致相同的问题,因为加法是一个关联运算符(即 )。a+b(a+b)+c == a+(b+c)

标识示例的问题在于,当在多个线程上并行执行 reduce 时,“X”将附加到每个中间结果的开头。

那么,究竟使用什么才是合适的身份呢?

如果将标识值更改为 :""

System.out.println(Arrays.asList("w","o","l","f"))
.parallelStream()
.reduce("", String::concat));

你会得到“wolf”而不是“XwXoXlXf”。


答案 2

让我举两个例子。首先,身份被打破的地方:

int result = Stream.of(1, 2, 3, 4, 5, 6)
        .parallel()
        .reduce(10, (a, b) -> a + b);

System.out.println(result); // 81 on my run

基本上你已经打破了这个规则:.The identity value must be an identity for the accumulator function.  This means that for all u, accumulator(identity, u) is equal to u

或者更简单,让我们看看该规则是否适用于Stream中的一些随机数据:

 Integer identity = 10;
 BinaryOperator<Integer> combiner = (x, y) -> x + y;
 boolean identityRespected = combiner.apply(identity, 1) == 1;
 System.out.println(identityRespected); // prints false

第二个例子:

/**
 * count letters, adding a bit more all the time
 */
private static int howMany(List<String> tokens) {
    return tokens.stream()
            .parallel()
            .reduce(0, // identity
                    (i, s) -> { // accumulator
                        return s.length() + i;
                    }, (left, right) -> { // combiner
                        return left + right + left; // notice the extra left here
                    });
}

你用以下命令调用它:

List<String> left = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee");
List<String> right = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee", "");

System.out.println(howMany(left));  // 38 on my run
System.out.println(howMany(right)); // 50 on my run

基本上你已经打破了这个规则:或在代码中:Additionally, the combiner function must be compatible with the accumulator function

// this must hold!
// combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

Integer identity = 0;
String t = "aa";
Integer u = 3; // "bbb"
BiFunction<Integer, String, Integer> accumulator = (Integer i, String s) -> i + s.length();
BinaryOperator<Integer> combiner = (left, right) -> left + right + left;

int first = accumulator.apply(identity, t); // 2
int second = combiner.apply(u, first); // 3 + 2 + 3 = 8

Integer shouldBe8 = accumulator.apply(u, t);

System.out.println(shouldBe8 == second); // false