限制流和无序流的内部更改

2022-09-02 21:31:33

基本上,这是在试图回答另一个问题时出现的。假设以下代码:

AtomicInteger i = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
IntStream.generate(() -> i.incrementAndGet())
        .parallel()
        .peek(x -> count.incrementAndGet())
        .limit(5)
        .forEach(System.out::println);

System.out.println("count = " + count);

我理解这是一个无序无限流的事实,并且要完成它,必须有一个短路操作(在这种情况下)。我还理解,在 Stream 实现达到该限制之前,可以随意调用 它的次数。IntStream#generatelimitSupplier

在java-8下运行它,将始终打印(可能并不总是如此,但在我的机器上是如此)。count512

相比之下,在java-10下运行此内容很少超过 。所以我的问题是内部发生了什么变化,以至于短路发生得更好(我试图通过拥有来源并试图做一些差异来自己回答这个问题......5


答案 1

这种变化发生在Java 9,beta 103和Java 9,beta 120(JDK-8154387)之间。

责任类是,或者说是它的超类。StreamSpliterators.UnorderedSliceSpliterator.OfIntStreamSpliterators.UnorderedSliceSpliterator

该类的旧版本看起来像

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
    static final int CHUNK_SIZE = 1 << 7;

    // The spliterator to slice
    protected final T_SPLITR s;
    protected final boolean unlimited;
    private final long skipThreshold;
    private final AtomicLong permits;

    UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
        this.s = s;
        this.unlimited = limit < 0;
        this.skipThreshold = limit >= 0 ? limit : 0;
        this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
    }

    UnorderedSliceSpliterator(T_SPLITR s,
                              UnorderedSliceSpliterator<T, T_SPLITR> parent) {
        this.s = s;
        this.unlimited = parent.unlimited;
        this.permits = parent.permits;
        this.skipThreshold = parent.skipThreshold;
    }

...

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Objects.requireNonNull(action);

            ArrayBuffer.OfRef<T> sb = null;
            PermitStatus permitStatus;
            while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                if (permitStatus == PermitStatus.MAYBE_MORE) {
                    // Optimistically traverse elements up to a threshold of CHUNK_SIZE
                    if (sb == null)
                        sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
                    else
                        sb.reset();
                    long permitsRequested = 0;
                    do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
                    if (permitsRequested == 0)
                        return;
                    sb.forEach(action, acquirePermits(permitsRequested));
                }
                else {
                    // Must be UNLIMITED; let 'er rip
                    s.forEachRemaining(action);
                    return;
                }
            }
        }

正如我们所看到的,它试图缓冲每个分路器中的元素,这些元素可能最终达到“CPU内核数”×128个元素。CHUNK_SIZE = 1 << 7

相比之下,新版本看起来像

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
    static final int CHUNK_SIZE = 1 << 7;

    // The spliterator to slice
    protected final T_SPLITR s;
    protected final boolean unlimited;
    protected final int chunkSize;
    private final long skipThreshold;
    private final AtomicLong permits;

    UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
        this.s = s;
        this.unlimited = limit < 0;
        this.skipThreshold = limit >= 0 ? limit : 0;
        this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
            ((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
        this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
    }

    UnorderedSliceSpliterator(T_SPLITR s,
                              UnorderedSliceSpliterator<T, T_SPLITR> parent) {
        this.s = s;
        this.unlimited = parent.unlimited;
        this.permits = parent.permits;
        this.skipThreshold = parent.skipThreshold;
        this.chunkSize = parent.chunkSize;
    }

...

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Objects.requireNonNull(action);

            ArrayBuffer.OfRef<T> sb = null;
            PermitStatus permitStatus;
            while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                if (permitStatus == PermitStatus.MAYBE_MORE) {
                    // Optimistically traverse elements up to a threshold of chunkSize
                    if (sb == null)
                        sb = new ArrayBuffer.OfRef<>(chunkSize);
                    else
                        sb.reset();
                    long permitsRequested = 0;
                    do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
                    if (permitsRequested == 0)
                        return;
                    sb.forEach(action, acquirePermits(permitsRequested));
                }
                else {
                    // Must be UNLIMITED; let 'er rip
                    s.forEachRemaining(action);
                    return;
                }
            }
        }

所以现在有一个实例字段 。当有一个定义的极限并且表达式的计算结果小于 时,将使用该较小的值。因此,当限制很小时,将小得多。在限制为 的情况下,块大小将始终为 。chunkSize((skip + limit) / AbstractTask.LEAF_TARGET) + 1CHUNK_SIZEchunkSize51


答案 2

推荐