在执行其他操作之前查找流大小

2022-09-04 20:20:08

在我的程序中,我反复收集了 1 个 Java 8 ,以将一个对象集合减少到一个对象集合。在整个执行过程中,此集合的大小可能会有很大差异:从 3 个对象到数百个对象。

public void findInterestingFoo(Stream<Foo> foos) {
    internalState.update(foos.collect(customCollector()));
}

在优化代码和搜索瓶颈的过程中,我使流在某些时候并行。这在当时是有效的,因为收藏品都相当大。后来,在更改了程序的其他部分和参数后,集合变得更小了。我意识到不使流并行更有效。这是有道理的:为4个对象在多个线程上分配工作的开销根本不值得。不过,对于数百个对象来说,这是值得的

如果我只能使大流并行,那将非常方便:

public void findInterestingFoo(Stream<Foo> foos) {
    if (isSmall(foos)) {
        internalState.update(foos.collect(customCollector()));
    } else {
        internalState.update(foos.parallel().collect(customCollector()));
    }
}

当然,当从数组集合或手动创建流时,可以手动执行此操作。也就是说,我们知道流中有哪些元素,因此可以对其进行跟踪。然而,我有兴趣以通用的方式解决这个问题,以便无论将哪种流传递给,它都能得到适当和尽可能有效的处理。findInterestingFoo

count()这样的东西可能会有所帮助,除了它在我收集它之前终止了流。

我很清楚流被设计为没有固定大小,特别是:

  • 可能是无限的。虽然集合的大小有限,但流不需要。短路操作(如 或 可以允许无限流的计算在有限的时间内完成)。— java.util.stream package descriptionlimit(n)findFirst()

不过,我想知道在对流执行任何操作之前,是否有任何方法可以确定流中有多少个元素。流真的不知道它是从有限集合创建的吗?

__________
1 千次。在我的情况下,优化这一点导致总运行时间从大约1.5秒加速到0.5秒。


答案 1

从理论上讲,你可以做这样的事情:

public void findInterestingFoo(Stream<Foo> foos) {
    Spliterator<Foo> sp = foos.spliterator();
    long size = sp.getExactSizeIfKnown();// returns -1 if not known
          // or sp.estimateSize(); // Long.MAX_VALUE means "unknown"
    internalState.update(
        StreamSupport.stream(sp, size > PARALLEL_THRESHOLD)
                     .collect(customCollector()));
}

spliterator() 是使用输入流的终端操作,但您可以将 Spliterator 传递给 StreamSupport.stream 以构造具有完全相同属性的流。第二个参数已经告诉流是否应为并行流。

从理论上讲。

实际上,当前流实现将返回不同的实现,具体取决于流是否并行。这意味着,将流重新创建为并行流可能最终得到一个无法执行并行处理的流,而原始流在调用 之前尚未并行。Spliteratorspliterator()

但是,如果没有中间操作,例如,当您直接从集合或数组传入创建的操作时,它确实可以很好地工作。Stream

在之前调用 parallel() 以获得一个支持并行的流,如果您决定这样做,该流仍可能按顺序运行,这在很多情况下都有效。但是,如果存在像输入流中这样的有状态中间操作,则即使您按顺序执行(反之亦然),它们也可能会固定为并行运行。spliterator()sorted()collect


另一个问题是根本性的。元素的数量实际上并没有说明并行处理是否会有好处。这确实取决于每个元素的工作负载,这不仅取决于您的终端操作,还取决于在进入方法之前已经链接到流的操作。即使您得出结论,收集器的工作负载已经足够高,值得并行处理,也可能是传入流具有跳过限制不同(在有序流上)等操作,这些操作通常在并行运行得更糟,并且需要完全不同的阈值。collect

一个更简单的解决方案是让调用方决定,因为调用方知道一些关于流的大小和性质的知识。您甚至不需要向方法的签名添加选项,因为调用方已经可以通过调用或在将流传递给您的方法之前在流上做出决定,并且您可以通过简单地不更改模式来遵守这一点。parallel()sequential()


答案 2

推荐