复制流以避免“流已作或已关闭”

2022-08-31 07:22:04

我想复制一个Java 8流,这样我就可以处理它两次。我可以作为一个列表,并从中获取新的流;collect

// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff

但我认为应该有一种更有效/更优雅的方式。

有没有办法复制流而不将其转换为集合?

我实际上正在处理一个s流,所以想要在移动到右侧投影之前以一种方式处理左投影,并以另一种方式处理它。有点像这样(到目前为止,我被迫使用这个技巧)。EithertoList

List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());

Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );

Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );

答案 1

我认为你对效率的假设有点倒退。如果您只打算使用一次数据,则可以获得巨大的效率回报,因为您不必存储它,而流为您提供了强大的“循环融合”优化,使您可以有效地通过管道传输整个数据。

如果要重用相同的数据,那么根据定义,您必须生成两次(确定性)或存储它。如果它已经碰巧在收藏中,那就太好了;然后迭代两次很便宜。

我们确实在设计中尝试了“分叉流”。我们发现,支持这一点有实际成本;它以牺牲不常见情况为代价,给常见情况(使用一次)带来了负担。最大的问题是处理“当两个管道不以相同的速率消耗数据时会发生什么”。现在,无论如何,您又回到了缓冲状态。这是一个显然没有分量的功能。

如果要对相同的数据进行重复操作,请存储它,或者以使用者身份构建操作,然后执行以下操作:

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });

您还可以查看RxJava库,因为它的处理模型更适合这种“流分叉”。


答案 2

您可以将局部变量与供应商一起使用来设置流管道的公共部分。

http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/

重用流

Java 8 流不能重用。一旦您调用任何终端操作,流就会关闭:

Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

Calling `noneMatch` after `anyMatch` on the same stream results in the following exception:
java.lang.IllegalStateException: stream has already been operated upon or closed
at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at 
java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)

为了克服这个限制,我们必须为我们要执行的每个终端操作创建一个新的流链,例如,我们可以创建一个流供应商来构建一个新的流,其中所有中间操作都已设置:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

每次调用都会构造一个新流,我们保存该流以调用所需的终端操作。get()


推荐