将 Java Future 转变为一个可实现的未来
Java 8 引入了 一个新的 Future 实现,它是可组合的(包括一堆 thenXxx 方法)。我想专门使用它,但是我想使用的许多库只返回不可组合的实例。CompletableFuture
Future
有没有办法将返回的实例包装在 里面,以便我可以编写它?Future
CompleteableFuture
Java 8 引入了 一个新的 Future 实现,它是可组合的(包括一堆 thenXxx 方法)。我想专门使用它,但是我想使用的许多库只返回不可组合的实例。CompletableFuture
Future
有没有办法将返回的实例包装在 里面,以便我可以编写它?Future
CompleteableFuture
如果要使用的库除了 Future 样式之外还提供了回调样式方法,则可以为其提供一个处理程序,该处理程序无需任何额外的线程阻塞即可完成 CompletableFuture。这样:
AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
// ...
CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
completableFuture.complete(buffer);
}
@Override
public void failed(Throwable exc, Void attachment) {
completableFuture.completeExceptionally(exc);
}
});
completableFuture.thenApply(...)
在没有回调的情况下,我看到解决这个问题的唯一另一种方法是使用一个轮询循环,将所有检查放在一个线程上,然后在 Future 可获取时调用 complete。Future.isDone()
有一种方法,但你不会喜欢它。以下方法将 a 转换为 :Future<T>
CompletableFuture<T>
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
// Normally, this should never happen inside ForkJoinPool
Thread.currentThread().interrupt();
// Add the following statement if the future doesn't have side effects
// future.cancel(true);
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
显然,这种方法的问题在于,对于每个Future,一个线程将被阻塞以等待Future的结果 - 这与期货的概念相矛盾。在某些情况下,可以做得更好。然而,总的来说,如果不积极等待未来的结果,就没有解决方案。