static ScheduledThreadPoolExecutor in CompletableFuture.Delayer

在java-9中,类中引入了新方法completeOnTimeoutCompletableFuture

public CompletableFuture<T> completeOnTimeout(T value, long timeout,
                                              TimeUnit unit) {
    if (unit == null)
        throw new NullPointerException();
    if (result == null)
        whenComplete(new Canceller(Delayer.delay(
                                       new DelayedCompleter<T>(this, value),
                                       timeout, unit)));
    return this;
}

我不明白的是,为什么它在其实现中使用静态ScheduledThreadPoolExecutor

    static ScheduledFuture<?> delay(Runnable command, long delay,
                                    TimeUnit unit) {
        return delayer.schedule(command, delay, unit);
    }

哪里

    static final ScheduledThreadPoolExecutor delayer;
    static {
        (delayer = new ScheduledThreadPoolExecutor(
            1, new DaemonThreadFactory())).
            setRemoveOnCancelPolicy(true);
    }

对我来说,这是一种非常奇怪的方法,因为它可能成为整个应用程序的瓶颈:唯一一个只有一个线程保留在池内以执行所有可能任务的方法?ScheduledThreadPoolExecutorCompletableFuture

我在这里错过了什么?

附言它看起来像这样:

  1. 此代码的作者不愿意提取此逻辑,而更愿意重用 ,ScheduledThreadPoolExecutor

  2. 这显然导致了一个具有静态变量的此类解决方案,因为为每个执行器创建一个新的执行器非常低效。CompletableFuture

但我的怀疑仍然存在,因为我发现一般的方法很奇怪。


答案 1

你是对的,这可能会成为一个瓶颈,但对于完成本身来说不是,它只是在.这个线程可以在一秒钟内完成数百万个期货。关键的方面是,完成可能会触发对完成线程中依赖阶段的评估。CompletableFuture

所以

Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
    CompletableFuture.supplyAsync(() -> "foo", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApply(s -> {
            System.out.println("long dependent action 1 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
CompletableFuture<String> c12 =
    CompletableFuture.supplyAsync(() -> "bar", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApply(s -> {
            System.out.println("long dependent action 2 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
System.out.println("set up");
CompletableFuture.allOf(
    c11.thenAccept(System.out::println),
    c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");

将打印

set up
long dependent action 1 Thread[CompletableFutureDelayScheduler,5,main]
timeout
long dependent action 2 Thread[CompletableFutureDelayScheduler,5,main]
timeout
12 s

使用链接方法将消除该问题…Async

Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
    CompletableFuture.supplyAsync(() -> "foo", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApplyAsync(s -> {
            System.out.println("long dependent action 1 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
CompletableFuture<String> c12 =
    CompletableFuture.supplyAsync(() -> "bar", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApplyAsync(s -> {
            System.out.println("long dependent action 2 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
System.out.println("set up");
CompletableFuture.allOf(
    c11.thenAccept(System.out::println),
    c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");

将打印

set up
long dependent action 2 Thread[ForkJoinPool.commonPool-worker-2,5,main]
long dependent action 1 Thread[ForkJoinPool.commonPool-worker-9,5,main]
timeout
timeout
7 s

结论是,当你有一个潜在的冗长的评估时,你应该总是通过其中一种方法进行链式。鉴于在使用没有“...Async“后缀(也可以是调用链接方法的线程或任何其他调用”完成方法“的线程,另请参阅此答案),这是您始终应该做的。…Async


答案 2

当然,这是作者要回答的问题。无论如何,这是我对此事的看法。

我不明白的是,为什么它在实现中使用静态ScheduledThreadPoolExecutor

...

对我来说,这是一种非常奇怪的方法,因为它可能成为整个应用程序的瓶颈:唯一一个只有一个线程保留在池内以执行所有可能任务的方法?ScheduledThreadPoolExecutorCompletableFuture

你是对的。可以运行任意代码。具体来说,和 将调用 和 ,默认情况下,它同步调用依赖项。ScheduledThreadPoolExecutororTimeout()completeOnTimeout()completeExceptionally()complete()

若要避免此行为,必须使用自己的子类或子类,使非方法始终调用方法。自 Java 9 以来,通过重写 , 这要容易得多。CompletionStageCompletableFuture*Async*AsyncnewIncompleteFuture()

它看起来像这样:

1)此代码的作者不愿意提取此逻辑,而倾向于重用 ,ScheduledThreadPoolExecutor

当出现在Java 7中时,它缺少一个公共线程池。Java 8 引入了静态 ,在引入的和类中默认使用(以及其他)。ForkJoinPoolcommonPool()CompletableFutureStream

他们似乎不愿意暴露一个共同的预定执行程序。这与公共线程池一样有用,可以避免许多很少使用的调度执行程序分散开来。

如果您需要具有静态间隔的延迟任务,那么考虑到包装对象的少量开销,则可能已经足够好了。CompletableFuture.delayedExecutor()

对于可变间隔,每次都有创建包装器的额外开销,但在此过程中已经创建了一些对象,例如内部 、和类的新实例。ExecutorCancellerTimeoutDelayedCompleterTaskSubmitter

我们多久需要以可变的时间间隔延迟许多任务?对于不同的超时,纯异步代码可能一直这样做,但是由于我们没有公开调度的执行器本身,因此我们要么假设此开销,要么使用另一个静态调度程序。

2)这显然导致了使用静态变量的这种解决方案,因为为每个执行器创建一个新的执行器非常低效。CompletableFuture

完全。