单声道与可复杂未来CompletableFuture 是 Async。但它是非阻塞的吗?单声道与可复杂未来总结

CompletableFuture在单独的线程上执行任务(使用线程池),并提供回调函数。假设我在 .这是 API 调用阻塞吗?线程是否会被阻塞,直到它没有得到 API 的响应?(我知道主线程/tomcat 线程将是非阻塞的,但是CompletableFuture任务正在执行的线程呢?CompletableFuture

据我所知,Mono是完全非阻塞的。

请对此有所了解,如果我错了,请纠正我。


答案 1

CompletableFuture 是 Async。但它是非阻塞的吗?

关于ComppletableFuture的一个正确之处在于它是真正的异步,它允许您从调用方线程异步运行您的任务,并且API允许您在结果可用时处理它。另一方面,并不总是非阻塞的。例如,当您运行以下代码时,它将在默认值上异步执行:thenXXXCompletableFutureForkJoinPool

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
});

很明显,执行任务的 in 最终将被阻止,这意味着我们无法保证调用是非阻塞的。ThreadForkJoinPool

另一方面,公开API允许您使其真正无阻塞。CompletableFuture

例如,您始终可以执行以下操作:

public CompletableFuture myNonBlockingHttpCall(Object someData) {
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    myAsyncHttpClient.execute(someData, (result, exception -> {
        if(exception != null) {
            uncompletedFuture.completeExceptionally(exception);
            return;
        }
        uncompletedFuture.complete(result);
    })

    return uncompletedFuture;
}

如您所见,未来的API为您提供了和方法,这些方法可以在需要时完成执行而不会阻塞任何线程。CompletableFuturecompletecompleteExceptionally

单声道与可复杂未来

在上一节中,我们概述了CF行为,但是CompletableFuture和Mono之间的核心区别是什么?

值得一提的是,我们也可以阻止Mono。没有人阻止我们写下以下内容:

Mono.fromCallable(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
})

当然,一旦我们订阅了未来,调用线程就会被阻塞。但是,我们始终可以通过提供额外的运算符来解决此问题。然而,更广泛的API并不是关键特性。subscribeOnMono

为了理解 和 之间的主要区别,让我们回到前面提到的方法实现。CompletableFutureMonomyNonBlockingHttpCall

public CompletableFuture myUpperLevelBusinessLogic() {
    var future = myNonBlockingHttpCall();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception
       var errorFuture = new CompletableFuture();
       errorFuture.completeExceptionally(new RuntimeException());

       return errorFuture;
    }

   return future;
}

在 的情况下,一旦调用了该方法,它将热切地执行对另一个服务/资源的 HTTP 调用。尽管在验证了一些前/后条件后,我们并不真正需要执行结果,但它会启动执行,并且将为这项工作分配额外的CPU/DB-Connections/What-Ever-Machine-Resources。CompletableFuture

相反,根据定义,该类型是懒惰的:Mono

public Mono myNonBlockingHttpCallWithMono(Object someData) {
    return Mono.create(sink -> {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            })
    });
} 

public Mono myUpperLevelBusinessLogic() {
    var mono = myNonBlockingHttpCallWithMono();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception

       return Mono.error(new RuntimeException());
    }

   return mono;
}

在这种情况下,在订阅最终版本之前不会发生任何事情。因此,只有当该方法返回时,才会被订阅,所提供的逻辑才会被执行。monoMonomyNonBlockingHttpCallWithMonoMono.create(Consumer)

我们可以走得更远。我们可以使我们的执行更加懒惰。如您所知,从反应式流规范扩展而来。反应流尖叫的功能是背压支持。因此,使用API,我们只有在真正需要数据时才能执行,并且我们的订阅者已准备好使用它们:MonoPublisherMono

Mono.create(sink -> {
    AtomicBoolean once = new AtomicBoolean();
    sink.onRequest(__ -> {
        if(!once.get() && once.compareAndSet(false, true) {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            });
        }
    });
});

在此示例中,我们仅在订阅者调用时执行数据,方法是它声明其已准备好接收数据。Subscription#request

总结

  • CompletableFuture是异步的,并且可以是非阻塞的
  • CompletableFuture很热心。您不能推迟执行。但是您可以取消它们(这总比没有好)
  • Mono是异步/非阻塞的,可以通过用不同的运算符组合main来轻松地执行任何不同的调用。ThreadMono
  • Mono是真正的懒惰,并允许订阅者存在及其使用数据的准备情况来推迟执行启动。

答案 2

在Oleh的答案的基础上,一个可能的懒惰解决方案将是CompletableFuture

public CompletableFuture myNonBlockingHttpCall(CompletableFuture<ExecutorService> dispatch, Object someData) {
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    dispatch.thenAccept(x -> x.submit(() -> {
        myAsyncHttpClient.execute(someData, (result, exception -> {
            if(exception != null) {
                uncompletedFuture.completeExceptionally(exception);
                return;
            }
            uncompletedFuture.complete(result);
        })
    }));

    return uncompletedFuture;
}

然后,以后你只需做

dispatch.complete(executor);

这将等同于,但我想没有背压。CompletableFutureMono


推荐