Java 8 流合路器从不调用

2022-09-01 18:46:10

我正在编写一个自定义java 8收集器,它应该计算具有方法的POJO的平均值。代码如下:getValue()

public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {

        @Override
        public Supplier<BigDecimal[]> supplier() {
            return () -> {
                BigDecimal[] start = new BigDecimal[2];
                start[0] = BigDecimal.ZERO;
                start[1] = BigDecimal.ZERO;
                return start;
            };
        }

        @Override
        public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
            return (a,b) ->  {
                a[0] = a[0].add(b.getValue());
                a[1] = a[1].add(BigDecimal.ONE);
            };
        }

        @Override
        public BinaryOperator<BigDecimal[]> combiner() {
            return (a,b) -> {
                a[0] = a[0].add(b[0]);
                a[1] = a[1].add(b[1]);
                return a;
            };
        }

        @Override
        public Function<BigDecimal[], BigDecimal> finisher() {
            return (a) -> {
                return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
            };
        }

        private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));

        @Override
        public Set<Characteristics> characteristics() {
            return CHARACTERISTICS;
        }

    };

在非并行情况下,这一切都很有效。但是,当我使用时,它有时不起作用。例如,给定从 1 到 10 的值,它计算 ( 53/9 而不是 55/10)。调试时,调试器永远不会命中 combiner() 函数中的断点。我需要设置某种标志吗?parallelStream()


答案 1

看起来问题出在特征上,它的作用比你想象的要多:CONCURRENT

指示此收集器是并发的,这意味着结果容器可以支持与来自多个线程的同一结果容器并发调用累加器函数。

不是调用合并器,而是并发调用累加器,对所有线程使用相同的累加器。对 的访问不是原子的,所以它出错了:BigDecimal[] aa

Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]

当值应为 10 时,将值设为 7。同样的事情也可能发生在 中,因此结果可能不一致。a[0]a[1]


如果删除该特性,则将使用合并器。CONCURRENT


答案 2

好吧,这正是您在指定特征时请求的。CONCURRENT

指示此收集器是并发的,这意味着结果容器可以支持与来自多个线程的同一结果容器并发调用累加器函数。

如果不是这种情况,就像您的 一样,您不应该指定该标志。Collector


作为旁注,对于指定特征非常低效。你可以只使用.删除错误的并发特征时,可以使用 或 ,但 a 绝对过分。new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED)EnumSet.of(Characteristics.UNORDERED)Collections.singleton(Characteristics.UNORDERED)HashSet


推荐