如何让 ThreadPoolExecutor 在排队前将线程增加到最大值?

一段时间以来,我对默认行为感到沮丧,它支持我们许多人使用的线程池。引用Javadocs的话:ThreadPoolExecutorExecutorService

如果运行的线程数超过 corePoolSize 但小于最大值池大小线程数,则仅当队列已满时,才会创建新线程。

这意味着,如果使用以下代码定义线程池,则它永远不会启动第二个线程,因为 第二个线程是无限的。LinkedBlockingQueue

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

只有当您有界队列并且队列已满时,才会启动高于核心编号的任何线程。我怀疑大量的初级Java多线程程序员都不知道.ThreadPoolExecutor

现在我有特定的用例,这不是最佳的。我正在寻找方法,无需编写自己的TPE类,即可解决它。

我的要求是针对向可能不可靠的第三方进行回调的 Web 服务。

  • 我不想与 Web 请求同步进行回调,因此我想使用线程池。
  • 我通常每分钟得到几个这样的线程,所以我不想有一个有大量的线程,这些线程大多处于休眠状态。newFixedThreadPool(...)
  • 每隔一段时间,我就会收到这种流量的突发,我想将线程数量扩展到某个最大值(假设50)。
  • 我需要尽最大努力完成所有回调,因此我想将50以上的任何其他回调排队。我不想通过使用.newCachedThreadPool()

在启动更多线程之前,如何解决队列需要限定和填满的此限制?如何让它在排队任务之前启动更多线程?ThreadPoolExecutor

编辑:

@Flavio很好地说明了使用 让核心线程超时并退出。我考虑过,但我仍然想要核心线程功能。如果可能的话,我不希望池中的线程数低于内核大小。ThreadPoolExecutor.allowCoreThreadTimeOut(true)


答案 1

在启动更多线程之前,如何解决队列需要绑定并填满的此限制。ThreadPoolExecutor

我相信我终于找到了一个有点优雅(也许有点黑客)的解决方案来解决这个限制。它涉及扩展以在已经有一些任务排队时返回它。如果当前线程无法跟上排队任务的步伐,TPE 将添加其他线程。如果池已达到最大线程数,则将调用 ,这将进入队列。ThreadPoolExecutorLinkedBlockingQueuefalsequeue.offer(...)RejectedExecutionHandlerput(...)

编写一个可以返回并且永远不会阻塞的队列当然很奇怪,所以这是黑客部分。但这与TPE对队列的使用效果很好,因此我认为这样做没有任何问题。offer(...)falseput()

代码如下:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

使用此机制,当我将任务提交到队列时,将:ThreadPoolExecutor

  1. 最初将线程数扩展到内核大小(此处为 1)。
  2. 将其提供给队列。如果队列为空,它将排队等待现有线程处理。
  3. 如果队列中已有 1 个或多个元素,则 将返回 false。offer(...)
  4. 如果返回 false,请纵向扩展池中的线程数,直到它们达到最大数目(此处为 50)。
  5. 如果在最大值,则调用RejectedExecutionHandler
  6. 然后将任务放入队列中,由第一个可用线程按 FIFO 顺序进行处理。RejectedExecutionHandler

尽管在上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列。例如,如果将容量 1000 添加到 中,则它将:LinkedBlockingQueue

  1. 将线程缩放到最大值
  2. 然后排队,直到1000个任务满为止
  3. 然后阻止调用方,直到队列有空位。

另外,如果您需要在 中使用,则可以使用该方法作为超时。offer(...)RejectedExecutionHandleroffer(E, long, TimeUnit)Long.MAX_VALUE

警告:

如果您希望在执行器关闭将任务添加到执行器中,那么您可能希望在执行器服务关闭时更明智地抛弃我们的自定义。感谢@RaduToader指出这一点。RejectedExecutionExceptionRejectedExecutionHandler

编辑:

对此答案的另一个调整可能是询问 TPE 是否存在空闲线程,并且只有在有空闲线程时才将项目排入队列。您必须为此创建一个真正的类,并在其上添加方法。ourQueue.setThreadPoolExecutor(tpe);

然后,您的方法可能如下所示:offer(...)

  1. 检查是否在这种情况下只需调用 .tpe.getPoolSize() == tpe.getMaximumPoolSize()super.offer(...)
  2. 否则,如果然后调用,因为似乎有空闲线程。tpe.getPoolSize() > tpe.getActiveCount()super.offer(...)
  3. 否则返回分叉另一个线程。false

也许这个:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

请注意,TPE 上的 get 方法开销很大,因为它们访问字段或(在 的情况下 )锁定 TPE 并遍历线程列表。此外,此处存在争用条件,可能会导致任务排队不正确,或者在存在空闲线程时分叉另一个线程。volatilegetActiveCount()


答案 2

关于这个问题,我已经有了另外两个答案,但我怀疑这个答案是最好的。

它基于当前接受的答案的技术,即:

  1. 重写队列的方法以(有时)返回 false,offer()
  2. 这会导致 生成新线程或拒绝任务,以及ThreadPoolExecutor
  3. 将 设置为在拒绝时实际对任务进行排队。RejectedExecutionHandler

问题是什么时候应该返回 false。当队列上有几个任务时,当前接受的答案返回 false,但正如我在评论中指出的那样,这会导致不良影响。或者,如果您总是返回 false,即使您有线程在队列上等待,您也会继续生成新线程。offer()

解决方案是使用Java 7 LinkedTransferQueue并调用。当存在等待的使用者线程时,任务将只传递到该线程。否则,将返回 false,并且 将生成一个新线程。offer()tryTransfer()offer()ThreadPoolExecutor

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });