Java 8 流:计算进入终端操作的所有元素

2022-09-02 22:31:15

我想知道是否有更好(或只是另一种)方法来获取进入流终端操作的所有项目的计数,而不是以下内容:

Stream<T> stream = ... // given as parameter
AtomicLong count = new AtomicLong();
stream.filter(...).map(...)
      .peek(t -> count.incrementAndGet())

其中为我提供了该阶段已处理项目的实际计数。count.get()

我故意跳过了终端操作,因为它可能会在 , 或 之间切换。我确实已经知道.count,但似乎只有当我用a交换a并使用as终端操作时,它才能正常工作。但在我看来,这似乎被误用了。.forEach.reduce.collect.forEach.map.count.map

我不太喜欢上述解决方案:如果在它之后添加一个过滤器,它只计算该特定阶段的元素,而不是进入终端操作的元素。

我想到的另一种方法是将过滤和映射的值放入列表中,然后对其进行操作,然后调用以获取计数。但是,如果收集流会导致错误,这将不起作用,而使用上述解决方案,如果适当的解决方案,我可以对到目前为止的所有已处理项目进行计数。然而,这并不是一个硬性要求。collectlist.size()try/catch


答案 1

似乎您已经通过终端操作IMO之前拥有最干净的解决方案。我认为需要这样做的唯一原因是出于调试目的 - 如果是这样的话,那么就比为此而设计的要好。为此包装 Stream 并提供单独的实现实在是太多了 - 除了大量的时间和后来对添加到 .peekpeekStreams

对于如果添加了另一个过滤器的部分,该怎么办?好吧,提供一个代码注释(我们很多人都这样做)和一些测试用例,否则这些测试用例会失败。


只是我的0.02$


答案 2

最好的想法是在自身上使用映射,同时计算映射例程的调用。

steam.map(object -> {counter.incrementAndGet(); return object;});

由于此 lambda 可以重用,并且您可以将任何 lambda 替换为对象,因此您可以创建如下所示的计数器对象:

class StreamCounter<T> implements Function<? super T,? extends T> {
  int counter = 0;
  public T apply(T object) { counter++; return object;}
  public int get() { return counter;}
}

所以使用:

StreamCounter<String> myCounter = new ...;
stream.map(myCounter)...
int count = myCounter.get();

由于映射调用只是另一个重用点,因此可以通过扩展 Stream 并包装普通流来提供映射方法。

通过这种方式,您可以创建类似以下内容的内容:

AtomicLong myValue = new AtomicLong();
...
convert(stream).measure(myValue).map(...).measure(mySecondValue).filter(...).measure(myThirdValue).toList(...);

通过这种方式,您可以简单地拥有自己的Stream包装器,该包装器将每个流透明地包装在自己的版本中(没有性能或内存开销),并测量任何此类测量点的基数。

这在创建 map/reduce 解决方案时分析算法的复杂性时通常会这样做。通过不采用原子长实例进行计数,而仅采用测量点的名称来扩展流实现,流实现可以容纳无限数量的测量点,同时提供打印报表的灵活方式。

这样的实现可以记住流方法的具体顺序以及每个测量点的位置,并带来如下输出:

list ->  (32k)map -> (32k)filter -> (5k)map -> avg(). 

这样的流实现编写一次,既可用于测试,也可用于报告。

内置到日常实现中,可以收集某些处理的统计信息,并允许使用不同的操作排列进行动态优化。例如,这将是一个查询优化器。

因此,在您的情况下,最好的方法是重用第一个,并根据使用频率,计数器数量和对DRY原则的亲和力,最终在以后实现更复杂的解决方案。StreamCounter

PS:使用int值并且不是线程安全的,因此在并行流设置中,可以用实例替换。StreamCounterintAtomicInteger


推荐