注册流“完成”挂钩

2022-09-01 11:59:48

使用Java 8 API,我想注册一个“完成钩子”,大致如下:Stream

Stream<String> stream = Stream.of("a", "b", "c");

// additional filters / mappings that I don't control
stream.onComplete((Completion c) -> {
    // This is what I'd like to do:
    closeResources();

    // This might also be useful:
    Optional<Throwable> exception = c.exception();
    exception.ifPresent(e -> throw new ExceptionWrapper(e));
});

我想这样做的原因是,我想将资源包装在API客户端中以供使用,并且我希望它在资源被消耗后自动清理。如果这是可能的,那么客户端可以调用:StreamStream

Collected collectedInOneGo =
Utility.something()
       .niceLookingSQLDSL()
       .moreDSLFeatures()
       .stream()
       .filter(a -> true)
       .map(c -> c)
       .collect(collector);

而不是目前需要的:

try (Stream<X> meh = Utility.something()
                            .niceLookingSQLDSL()
                            .moreDSLFeatures()
                            .stream()) {

    Collected collectedWithUglySyntacticDissonance =
    meh.filter(a -> true)
       .map(c -> c)
       .collect(collector);
}

理想情况下,我想进入 的各种方法,例如:java.util.stream.ReferencePipeline

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    try {

        // Existing loop
        do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
    }

    // These would be nice:
    catch (Throwable t) {
        completion.onFailure(t);
    }
    finally {
        completion.onSuccess();
    }
}

有没有一种简单的方法可以使用现有的JDK 8 API来做到这一点?


答案 1

除基于解决方案(如@Holger所建议)之外,任何拦截终端操作的解决方案对于以下代码都是脆弱的:flatMap

Stream<String> stream = getAutoCloseableStream();
if(stream.iterator().hasNext()) {
    // do something if stream is non-empty
}

根据规范,这种用法是绝对合法的。不要忘记这一点,并且是终端流操作,但在它们执行后,您仍然需要访问流源。此外,放弃 或 在任何状态下都是完全有效的,所以你只是不知道它是否会被进一步使用。iterator()spliterator()IteratorSpliterator

您可以考虑建议用户不要使用 和 ,但是此代码呢?iterator()spliterator()

Stream<String> stream = getAutoCloseableStream();
Stream.concat(stream, Stream.of("xyz")).findFirst();

这在内部用于第一个流,然后放弃它(但如果显式调用生成的流,则关闭)。您还需要要求用户不要使用。据我所知,在您的图书馆内部,您正在使用/非常频繁,因此您需要重新访问所有这些地方以解决可能的问题。而且,当然还有很多其他库也使用/并且在那之后可能会短路:所有这些都会与您的功能不兼容。spliterator().tryAdvance()close()Stream.concatiterator()spliterator()iterator()spliterator()

为什么基于解决方案在这里工作?因为在第一次调用 时,它会将整个流内容转储到中间缓冲区中并关闭原始流源。因此,根据流的大小,您可能会浪费很多中间内存,甚至拥有.flatMaphasNext()tryAdvance()OutOfMemoryError

您也可以考虑将 s 保留在对象中并监视 .在这种情况下,完成将由垃圾回收器触发(这也有一些缺点)。PhantomReferenceStreamReferenceQueue

总之,我的建议是继续尝试资源。


答案 2

最简单的解决方案是将一个流包装在另一个流中,并将其平映射到自身:

// example stream
Stream<String> original=Stream.of("bla").onClose(()->System.out.println("close action"));

// this is the trick
Stream<String> autoClosed=Stream.of(original).flatMap(Function.identity());

//example op
int sum=autoClosed.mapToInt(String::length).sum();
System.out.println(sum);

它之所以有效,在于 flatMap 操作

每个映射流在其内容放入此流后都将关闭。

但是当前的实现并不像使用flatMap时那样懒惰。此问题已在 Java 10 中修复。


我的建议是在需要关闭返回的流时,继续使用标准解决方案和文档。毕竟,在终端操作后关闭资源的流是不安全的,因为不能保证客户端实际调用终端操作。改变主意并放弃流是一种有效的用法,而当文档指定需要该方法时,不调用该方法则不是。try(…)close()


推荐