我可以使用 ForkJoinPool 的工作窃取行为来避免线程匮乏死锁吗?

如果池中的所有线程都在等待同一池中的排队任务完成,则在普通线程池中会发生线程匮乏死锁。 通过从调用内部窃取其他线程的工作(而不是简单地等待)来避免此问题。例如:ForkJoinPooljoin()

private static class ForkableTask extends RecursiveTask<Integer> {
    private final CyclicBarrier barrier;

    ForkableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    protected Integer compute() {
        try {
            barrier.await();
            return 1;
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
}

@Test
public void testForkJoinPool() throws Exception {
    final int parallelism = 4;
    final ForkJoinPool pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
    for (int i = 0; i < parallelism; ++i) {
        forkableTasks.add(new ForkableTask(barrier));
    }

    int result = pool.invoke(new RecursiveTask<Integer>() {
        @Override
        protected Integer compute() {
            for (ForkableTask task : forkableTasks) {
                task.fork();
            }

            int result = 0;
            for (ForkableTask task : forkableTasks) {
                result += task.join();
            }
            return result;
        }
    });
    assertThat(result, equalTo(parallelism));
}

但是当使用接口进行操作时,工作窃取似乎不会发生。例如:ExecutorServiceForkJoinPool

private static class CallableTask implements Callable<Integer> {
    private final CyclicBarrier barrier;

    CallableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public Integer call() throws Exception {
        barrier.await();
        return 1;
    }
}

@Test
public void testWorkStealing() throws Exception {
    final int parallelism = 4;
    final ExecutorService pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
    int result = pool.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            int result = 0;
            // Deadlock in invokeAll(), rather than stealing work
            for (Future<Integer> future : pool.invokeAll(callableTasks)) {
                result += future.get();
            }
            return result;
        }
    }).get();
    assertThat(result, equalTo(parallelism));
}

粗略地看一下 的实现,所有常规 API 都是使用 s 实现的,所以我不确定为什么会发生死锁。ForkJoinPoolExecutorServiceForkJoinTask


答案 1

你几乎在回答你自己的问题。解决方案是“通过从调用内部窃取其他线程的工作来避免此问题”的声明。每当线程由于其他原因被阻止时,除了,此工作窃取不会发生,线程只是等待并且不执行任何操作。ForkJoinPooljoin()ForkJoinPool.join()

这样做的原因是,在Java中,不可能阻止其线程阻塞,而是为它们提供其他工作。线程本身需要避免阻塞,而是要求池进行它应该执行的工作。这仅在方法中实现,而不是在任何其他阻止方法中实现。如果你使用一个里面的,你也会看到饥饿死锁。ForkJoinPoolForkJoinTask.join()FutureForkJoinPool

为什么工作窃取只在Java API中实现,而不是在任何其他阻塞方法中实现?好吧,有很多这样的阻塞方法(,,I / O方法等中的任何并发基元),它们与无关,这只是API中的任意类,因此在所有这些方法中添加特殊情况将是糟糕的设计。它还可能导致非常令人惊讶和不希望的影响。例如,假设一个用户将一个任务传递给等待 a 的 ,然后发现该任务挂起了很长时间,只是因为正在运行的线程窃取了其他(长时间运行的)工作项,而不是等待 并在结果可用后立即继续。一旦一个线程开始处理另一个任务,它就无法返回到原始任务,直到第二个任务完成。因此,其他阻止方法不做偷窃工作实际上是一件好事。对于 a ,这个问题并不存在,因为尽快继续执行主要任务并不重要,重要的是尽可能高效地处理所有任务。ForkJoinTask.join()Object.wait()Future.get()java.util.concurrentForkJoinPoolExecutorServiceFutureFuture.get()FutureForkJoinTask

也不可能实现自己的方法来在 a 中执行工作窃取,因为所有相关部分都不是公开的。ForkJoinPool

但是,实际上还有第二种方法可以防止饥饿死锁。这称为托管阻止。它不使用工作窃取(以避免上面提到的问题),但还需要将要阻塞的线程与线程池积极配合。使用托管阻塞时,线程在调用潜在阻塞方法之前告诉线程池它可能被阻塞,并在阻塞方法完成时通知池。然后,线程池知道存在饥饿死锁的风险,并且如果其所有线程当前都处于某个阻塞操作中,并且仍有其他任务要执行,则可能会生成其他线程。请注意,这比工作窃取效率低,因为附加线程的开销很大。如果您使用普通期货和托管阻塞而不是工作窃取来实现递归并行算法,则附加线程的数量可能会变得非常大(因为在算法的“除法”阶段,将创建许多任务并将其提供给立即阻止并等待子任务结果的线程)。但是,饥饿死锁仍然可以防止,并且它避免了任务必须等待很长时间的问题,因为其线程在此期间开始处理另一个任务。ForkJoinTask

Java 还支持托管阻止。要使用它,需要实现接口 ForkJoinPool.ManagedBlocker,以便从此接口的方法中调用任务要执行的潜在阻塞方法。然后,任务可能不会直接调用阻塞方法,而是需要调用静态方法 ForkJoinPool.managedBlock(ManagedBlocker)。此方法处理阻塞之前和之后与线程池的通信。如果当前任务不在 中执行,则它也有效,然后它只调用阻塞方法。ForkJoinPoolblockForkJoinPool

我在 Java API(用于 Java 7)中发现的唯一实际使用托管阻止的地方是类 Phaser。(此类是像互斥锁和锁存器一样的同步屏障,但更加灵活和强大。因此,与内部任务同步应使用托管阻止,并且可以避免饥饿死锁(但仍然更可取,因为它使用工作窃取而不是托管阻止)。无论您直接使用还是通过其接口使用,这都有效。但是,如果您使用任何其他类(如类创建的内容),则它不起作用,因为这些类不支持托管阻止。PhaserForkJoinPoolForkJoinTask.join()ForkJoinPoolExecutorServiceExecutorServiceExecutors

在Scala中,托管阻塞的使用更为广泛(描述API)。


答案 2

我知道你在做什么,但我不知道为什么。屏障的概念是,独立的线程可以等待彼此到达一个共同点。您没有独立的线程。线程池 F/J 用于数据并行性

您正在执行更符合任务并行性的工作

F/J 继续的原因是,当所有工作线程都在等待时,框架会创建“延续线程”以继续从 deques 获取工作。


推荐