将 Java Future 转变为一个可实现的未来

2022-08-31 08:31:03

Java 8 引入了 一个新的 Future 实现,它是可组合的(包括一堆 thenXxx 方法)。我想专门使用它,但是我想使用的许多库只返回不可组合的实例。CompletableFutureFuture

有没有办法将返回的实例包装在 里面,以便我可以编写它?FutureCompleteableFuture


答案 1

如果要使用的库除了 Future 样式之外还提供了回调样式方法,则可以为其提供一个处理程序,该处理程序无需任何额外的线程阻塞即可完成 CompletableFuture。这样:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

在没有回调的情况下,我看到解决这个问题的唯一另一种方法是使用一个轮询循环,将所有检查放在一个线程上,然后在 Future 可获取时调用 complete。Future.isDone()


答案 2

有一种方法,但你不会喜欢它。以下方法将 a 转换为 :Future<T>CompletableFuture<T>

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

显然,这种方法的问题在于,对于每个Future,一个线程将被阻塞以等待Future的结果 - 这与期货的概念相矛盾。在某些情况下,可以做得更好。然而,总的来说,如果不积极等待未来的结果,就没有解决方案。