Java 8 CompletableFuture 中具有默认值的超时

2022-09-01 08:50:12

假设我有一些异步计算,例如:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .thenAccept(foo -> doStuffWithFoo(foo));

有没有一种很好的方法可以在异步供应商根据某些指定的超时超时时时为 foo 提供默认值?理想情况下,此类功能也会尝试取消运行缓慢的供应商。例如,是否存在类似于以下假设代码的标准库功能:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .acceptEither(
                CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
                foo -> doStuffWithFoo(foo));

或者甚至更好:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
        .thenAccept(foo -> doStuffWithFoo(foo));

我知道 ,但我想知道是否有更好的标准方法来以异步和反应式方式应用超时,如上面的代码所示。get(timeout, unit)

编辑:这是一个受Java 8启发的解决方案:lambda表达式中的强制检查异常处理。为什么是强制性的,而不是可选的?,但不幸的是,它阻止了一个线程。如果我们依靠 createFoo() 异步检查超时并抛出自己的超时异常,它将在不阻塞线程的情况下工作,但会给供应商的创建者带来更多负担,并且仍然会产生创建异常的成本(如果没有“快速抛出”,这可能会很昂贵)

static <T> Supplier<T> wrapped(Callable<T> callable) {
    return () -> {
        try {
            return callable.call();
        } catch (RuntimeException e1) {
            throw e1;
        } catch (Throwable e2) {
            throw new RuntimeException(e2);
        }
    };
}
CompletableFuture
        .supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
        .exceptionally(e -> "default")
        .thenAcceptAsync(s -> doStuffWithFoo(foo));

答案 1

CompletableFuture.supplyAsync 只是一个帮助器方法,它为您创建一个 CompletableFuture,并将任务提交到 ForkJoin Pool。

您可以创建自己的供应符合您的要求,如下所示:

private static final ScheduledExecutorService schedulerExecutor = 
                                 Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService = 
                                 Executors.newCachedThreadPool();


public static <T> CompletableFuture<T> supplyAsync(
        final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
        T defaultValue) {

    final CompletableFuture<T> cf = new CompletableFuture<T>();

    // as pointed out by Peti, the ForkJoinPool.commonPool() delivers a 
    // ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
    // Using Executors.newCachedThreadPool instead in the example
    // submit task
    Future<?> future = executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    //schedule watcher
    schedulerExecutor.schedule(() -> {
        if (!cf.isDone()) {
            cf.complete(defaultValue);
            future.cancel(true);
        }

    }, timeoutValue, timeUnit);

    return cf;
}

使用该帮助程序创建ComppletableFuture就像在ComppletableFuture中使用静态方法一样简单:

    CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
            TimeUnit.SECONDS, "default");

要测试它:

    a = supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e1) {
            // ignore
        }
        return "hi";
    }, 1, TimeUnit.SECONDS, "default");

答案 2

在Java 9中,将有completeOnTimeout(T值,长超时,TimeUnit单位),它可以做你想要的,尽管它不会取消慢速供应商。

还有一个orTimeout(长超时,TimeUnit单位),在超时的情况下例外地完成。


推荐