如何取消Java 8可完成的未来?

我正在玩Java 8可兼容的未来。我有以下代码:

CountDownLatch waitLatch = new CountDownLatch(1);

CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
    try {
        System.out.println("Wait");
        waitLatch.await(); //cancel should interrupt
        System.out.println("Done");
    } catch (InterruptedException e) {
        System.out.println("Interrupted");
        throw new RuntimeException(e);
    }
});

sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");

assertTrue(future.isCancelled());

assertTrue(future.isDone());
sleep(100); //give it some time to finish

使用 runAsync,我计划执行在闩锁上等待的代码。接下来,我取消了未来,期望在内部抛出中断的异常。但是,线程似乎在 await 调用时仍然被阻塞,并且即使取消了未来(断言通过),也永远不会引发 InterruptedException。使用 ExecutorService 的等效代码按预期工作。它是CompletableFuture中的错误还是我的例子中的错误?


答案 1

当您调用 时,您只会停止链的下游部分。上游部分,即最终会调用或的东西,不会得到任何信号,表明不再需要结果。CompletableFuture#cancelcomplete(...)completeExceptionally(...)

那些“上游”和“下游”的东西是什么?

让我们考虑以下代码:

CompletableFuture
        .supplyAsync(() -> "hello")               //1
        .thenApply(s -> s + " world!")            //2
        .thenAccept(s -> System.out.println(s));  //3

在这里,数据从上到动 - 从供应商创建,到按功能修改,再到被使用。上述特定步骤的部分称为上游,下面的部分称为下游。例如,步骤 1 和 2 是步骤 3 的上游。println

以下是幕后发生的事情。这并不精确,而是一个方便的思维模型。

  1. 正在执行供应商(步骤 1)(在 JVM 的通用内)。ForkJoinPool
  2. 然后,供应商的结果被传递到下一个下游。complete(...)CompletableFuture
  3. 收到结果后,这将调用下一步 - 一个函数(步骤2),该函数采用上一步的结果并返回将进一步传递给下游的东西。CompletableFutureCompletableFuturecomplete(...)
  4. 收到步骤 2 结果后,步骤 3 调用使用者 。消费者完成后,下游将获得它的价值,CompletableFutureSystem.out.println(s)CompletableFuture(Void) null

正如我们所看到的,这个链中的每个人都必须知道下游谁在那里等待将值传递给他们的(或)。但是,不必知道任何关于它的上游(或上游 - 可能有几个)。CompletableFuturecomplete(...)completeExceptionally(...)CompletableFuture

因此,在步骤 3 调用 cancel() 不会中止步骤 1 和 2,因为从步骤 3 到步骤 2 没有链接。

应该说,如果你正在使用,那么你的步骤足够小,所以如果执行几个额外的步骤,就不会有坏处。CompletableFuture

如果要将取消传播到上游,您有两种选择:

  • 自己实现它 - 创建一个专用的(命名方式),每一步后都会检查(类似于CompletableFuturecancelledstep.applyToEither(cancelled, Function.identity()))
  • 使用响应式堆栈,如 RxJava 2、ProjectReactor/Flux 或 Akka Streams

答案 2

显然,这是故意的。方法CompletableFuture::cancel的Javadoc状态:

[Parameters:] mayInterruptIfRunning - 此值在此实现中没有效果,因为中断不用于控制处理。

有趣的是,ForkJoinTask::cancel 方法对参数 mayInterruptIfRunning 使用了几乎相同的措辞。

我对这个问题有一个猜测:

  • 中断旨在与阻塞操作一起使用,如睡眠等待或 I/O 操作,
  • 但是CompletableFutureForkJoinTask都不打算用于阻塞操作。

CompletableFuture 应该创建一个新的 FinishStage,而不是阻塞,并且 CPU 密集型任务是分叉连接模型的先决条件。因此,对它们中的任何一个使用中断都会破坏它们的目的。另一方面,它可能会增加复杂性,如果按预期使用,则不需要。