CompletableFuture 是 Async。但它是非阻塞的吗?
关于ComppletableFuture的一个正确之处在于它是真正的异步,它允许您从调用方线程异步运行您的任务,并且API允许您在结果可用时处理它。另一方面,并不总是非阻塞的。例如,当您运行以下代码时,它将在默认值上异步执行:thenXXX
CompletableFuture
ForkJoinPool
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
});
很明显,执行任务的 in 最终将被阻止,这意味着我们无法保证调用是非阻塞的。Thread
ForkJoinPool
另一方面,公开API允许您使其真正无阻塞。CompletableFuture
例如,您始终可以执行以下操作:
public CompletableFuture myNonBlockingHttpCall(Object someData) {
var uncompletedFuture = new CompletableFuture(); // creates uncompleted future
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
uncompletedFuture.completeExceptionally(exception);
return;
}
uncompletedFuture.complete(result);
})
return uncompletedFuture;
}
如您所见,未来的API为您提供了和方法,这些方法可以在需要时完成执行而不会阻塞任何线程。CompletableFuture
complete
completeExceptionally
单声道与可复杂未来
在上一节中,我们概述了CF行为,但是CompletableFuture和Mono之间的核心区别是什么?
值得一提的是,我们也可以阻止Mono。没有人阻止我们写下以下内容:
Mono.fromCallable(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
})
当然,一旦我们订阅了未来,调用线程就会被阻塞。但是,我们始终可以通过提供额外的运算符来解决此问题。然而,更广泛的API并不是关键特性。subscribeOn
Mono
为了理解 和 之间的主要区别,让我们回到前面提到的方法实现。CompletableFuture
Mono
myNonBlockingHttpCall
public CompletableFuture myUpperLevelBusinessLogic() {
var future = myNonBlockingHttpCall();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
var errorFuture = new CompletableFuture();
errorFuture.completeExceptionally(new RuntimeException());
return errorFuture;
}
return future;
}
在 的情况下,一旦调用了该方法,它将热切地执行对另一个服务/资源的 HTTP 调用。尽管在验证了一些前/后条件后,我们并不真正需要执行结果,但它会启动执行,并且将为这项工作分配额外的CPU/DB-Connections/What-Ever-Machine-Resources。CompletableFuture
相反,根据定义,该类型是懒惰的:Mono
public Mono myNonBlockingHttpCallWithMono(Object someData) {
return Mono.create(sink -> {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
})
});
}
public Mono myUpperLevelBusinessLogic() {
var mono = myNonBlockingHttpCallWithMono();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
return Mono.error(new RuntimeException());
}
return mono;
}
在这种情况下,在订阅最终版本之前不会发生任何事情。因此,只有当该方法返回时,才会被订阅,所提供的逻辑才会被执行。mono
Mono
myNonBlockingHttpCallWithMono
Mono.create(Consumer)
我们可以走得更远。我们可以使我们的执行更加懒惰。如您所知,从反应式流规范扩展而来。反应流尖叫的功能是背压支持。因此,使用API,我们只有在真正需要数据时才能执行,并且我们的订阅者已准备好使用它们:Mono
Publisher
Mono
Mono.create(sink -> {
AtomicBoolean once = new AtomicBoolean();
sink.onRequest(__ -> {
if(!once.get() && once.compareAndSet(false, true) {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
});
}
});
});
在此示例中,我们仅在订阅者调用时执行数据,方法是它声明其已准备好接收数据。Subscription#request
总结
-
CompletableFuture
是异步的,并且可以是非阻塞的
-
CompletableFuture
很热心。您不能推迟执行。但是您可以取消它们(这总比没有好)
-
Mono
是异步/非阻塞的,可以通过用不同的运算符组合main来轻松地执行任何不同的调用。Thread
Mono
-
Mono
是真正的懒惰,并允许订阅者存在及其使用数据的准备情况来推迟执行启动。