流有状态计算:累积总和

2022-09-02 04:36:12

假设我有一个Java IntStream,是否可以将其转换为具有累积总和的IntStream?例如,以 [4, 2, 6, ...] 开头的流应转换为 [4, 6, 12, ...]。

更一般地说,应该如何实现有状态流操作?感觉这应该是可能的:

myIntStream.map(new Function<Integer, Integer> {
    int sum = 0; 
    Integer apply(Integer value){ 
        return sum += value; 
    }
);

有明显的限制,即这仅适用于顺序流。但是,Stream.map 显式需要无状态映射函数。我是否错过了 Stream.statefulMap 或 Stream.cumulative 操作,或者缺少 Java 流的要点?

例如,与Haskell进行比较,其中scanl1函数恰好解决了这个例子:

scanl1 (+) [1 2 3 4] = [1 3 6 10]

答案 1

你可以用原子序数来做到这一点。例如:

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

public class Accumulator {
    public static LongStream toCumulativeSumStream(IntStream ints){
        AtomicLong sum = new AtomicLong(0);
        return ints.sequential().mapToLong(sum::addAndGet);
    }

    public static void main(String[] args){
        LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5));
        sums.forEachOrdered(System.out::println);
    }
}

此输出:

1
3
6
10

我使用 Long 来存储总和,因为两个 int 加起来完全有可能超过 ,而 Long 溢出的可能性较小。Integer.MAX_VALUE


答案 2

可以使用收集器,然后创建一个新流:

class Accumulator {
    public static void accept(List<Integer> list, Integer value) {
        list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1)));
    }

    public static List<Integer> combine(List<Integer> list1, List<Integer> list2) {
        int total = list1.get(list1.size() - 1);
        list2.stream().map(n -> n + total).forEach(list1::add);
        return list1;
    }
}

这用作:

myIntStream.parallel()
    .collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine)
    .stream();

希望您可以看到此收集器的重要属性是,即使流在组合实例时是并行的,它也会调整总计。Accumulator

这显然不如地图操作有效,因为它收集整个流,然后生成一个新流。但这不仅仅是一个实现细节:它是流旨在潜在并发处理的事实的必要功能。

我已经测试了它,它的功能正确。IntStream.range(0, 10000).parallel()


推荐