线程池执行器阻塞队列已满?

我正在尝试使用ThreadPoolExecutor执行许多任务。下面是一个假设的例子:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

问题是我很快就得到了,因为任务的数量超过了工作队列的大小。但是,我正在寻找的所需行为是拥有主线程块,直到队列中有空间。实现此目的的最佳方法是什么?java.util.concurrent.RejectedExecutionException


答案 1

在某些非常狭窄的情况下,您可以实现一个java.util.concurrent.RejectedExecutionHandler,它可以满足您的需求。

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

现在。这是一个非常糟糕的主意,原因如下

  • 它很容易出现死锁,因为在您放入队列中的内容可见之前,池中的所有线程都可能死亡。通过设置合理的保持活动时间来缓解此问题。
  • 任务未按照执行程序预期的方式进行包装。许多执行器实现在执行之前将其任务包装在某种跟踪对象中。看看你的来源。
  • API 强烈反对通过 getQueue() 添加,并且可能在某些时候被禁止。

一个几乎总是更好的策略是安装ThreadPoolExecutor.CallerRunsPolicy,它将通过在调用 execute() 的线程上运行任务来限制你的应用程序。

但是,有时具有所有固有风险的阻止策略确实是您想要的。我会说在这些条件下

  • 你只有一个线程调用 execute()
  • 您必须(或希望)具有非常小的队列长度
  • 您绝对需要限制运行此工作的线程数(通常是出于外部原因),而调用方运行策略将打破这一点。
  • 您的任务大小不可预测,因此,如果池暂时忙于 4 个短任务,并且您的一个线程调用执行卡在一个大任务中,则调用方运行可能会带来饥饿。

所以,正如我所说。它很少需要,而且可能很危险,但你去了。

祝你好运。


答案 2

您需要做的是将 ThreadPoolExecutor 包装到 Executor 中,这明确限制了其中并发执行操作的数量:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

您可以将并发任务限制调整为委托执行器的线程PoolSize + queueSize,它几乎可以解决您的问题


推荐