Java ExecutorService: await所有递归创建的任务的终结

我使用 来执行任务。此任务可以递归方式创建提交到同一任务的其他任务,这些子任务也可以执行此操作。ExecutorServiceExecutorService

我现在有一个问题,我想等到所有任务都完成(即,所有任务都已完成,他们没有提交新任务)之后再继续。

我无法在主线程中调用,因为这会阻止新任务被.ExecutorService.shutdown()ExecutorService

如果没有被召唤,召唤似乎无济于事。ExecutorService.awaitTermination()shutdown

所以我有点被困在这里。看到所有工人都闲置并不难,不是吗?我能想到的唯一不优雅的解决方案是直接使用 a 并每隔一段时间查询一次。真的没有更好的方法来做到这一点吗?ExecutorServiceThreadPoolExecutorgetPoolSize()


答案 1

这确实是相位器的理想候选者。Java 7即将推出这个新类。它是一个灵活的CountdonwLatch/CyclicBarrier。您可以在 JSR 166 兴趣站点上获得稳定版本。

它是一个更灵活的CountdownLatch/CyclicBarrier的方式是因为它不仅能够支持未知数量的参与方(线程),而且还可以重用(这就是阶段部分进来的地方)

对于您提交的每个任务,您将注册,当该任务完成后,您将到达。这可以递归方式完成。

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}

编辑:感谢@depthofreality指出了我前面的示例中的竞争条件。我正在更新它,以便执行线程仅等待当前阶段的推进,因为它阻止了递归函数的完成。

相位数在 s == s 的数目之前不会跳变。由于在每个递归调用之前,当所有调用完成时,将发生阶段增量。arriveregisterregister


答案 2

如果递归任务树中的任务数量最初是未知的,也许最简单的方法是实现自己的同步基元,某种“逆信号量”,并在任务之间共享它。在提交每个任务之前,您递增一个值,当任务完成时,它会递减该值,然后等待该值为 0。

将其实现为从任务中显式调用的单独基元,可将此逻辑与线程池实现分离,并允许您将递归任务的多个独立树提交到同一池中。

像这样:

public class InverseSemaphore {
    private int value = 0;
    private Object lock = new Object();

    public void beforeSubmit() {
        synchronized(lock) {
            value++;
        }
    }

    public void taskCompleted() {
        synchronized(lock) {
            value--;
            if (value == 0) lock.notifyAll();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        synchronized(lock) {
            while (value > 0) lock.wait();
        }
    }
}

请注意,应在块内调用,以使其不受可能的异常影响。taskCompleted()finally

另请注意,在提交任务之前,应由提交线程调用,而不是由任务本身调用,以避免在旧任务完成而新任务尚未启动时可能的“错误完成”。beforeSubmit()

编辑:修复了使用模式的重要问题。


推荐