在启动更多线程之前,如何解决队列需要绑定并填满的此限制。ThreadPoolExecutor
我相信我终于找到了一个有点优雅(也许有点黑客)的解决方案来解决这个限制。它涉及扩展以在已经有一些任务排队时返回它。如果当前线程无法跟上排队任务的步伐,TPE 将添加其他线程。如果池已达到最大线程数,则将调用 ,这将进入队列。ThreadPoolExecutor
LinkedBlockingQueue
false
queue.offer(...)
RejectedExecutionHandler
put(...)
编写一个可以返回并且永远不会阻塞的队列当然很奇怪,所以这是黑客部分。但这与TPE对队列的使用效果很好,因此我认为这样做没有任何问题。offer(...)
false
put()
代码如下:
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
// have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
使用此机制,当我将任务提交到队列时,将:ThreadPoolExecutor
- 最初将线程数扩展到内核大小(此处为 1)。
- 将其提供给队列。如果队列为空,它将排队等待现有线程处理。
- 如果队列中已有 1 个或多个元素,则 将返回 false。
offer(...)
- 如果返回 false,请纵向扩展池中的线程数,直到它们达到最大数目(此处为 50)。
- 如果在最大值,则调用
RejectedExecutionHandler
- 然后将任务放入队列中,由第一个可用线程按 FIFO 顺序进行处理。
RejectedExecutionHandler
尽管在上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列。例如,如果将容量 1000 添加到 中,则它将:LinkedBlockingQueue
- 将线程缩放到最大值
- 然后排队,直到1000个任务满为止
- 然后阻止调用方,直到队列有空位。
另外,如果您需要在 中使用,则可以使用该方法作为超时。offer(...)
RejectedExecutionHandler
offer(E, long, TimeUnit)
Long.MAX_VALUE
警告:
如果您希望在执行器关闭后将任务添加到执行器中,那么您可能希望在执行器服务关闭时更明智地抛弃我们的自定义。感谢@RaduToader指出这一点。RejectedExecutionException
RejectedExecutionHandler
编辑:
对此答案的另一个调整可能是询问 TPE 是否存在空闲线程,并且只有在有空闲线程时才将项目排入队列。您必须为此创建一个真正的类,并在其上添加方法。ourQueue.setThreadPoolExecutor(tpe);
然后,您的方法可能如下所示:offer(...)
- 检查是否在这种情况下只需调用 .
tpe.getPoolSize() == tpe.getMaximumPoolSize()
super.offer(...)
- 否则,如果然后调用,因为似乎有空闲线程。
tpe.getPoolSize() > tpe.getActiveCount()
super.offer(...)
- 否则返回分叉另一个线程。
false
也许这个:
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
请注意,TPE 上的 get 方法开销很大,因为它们访问字段或(在 的情况下 )锁定 TPE 并遍历线程列表。此外,此处存在争用条件,可能会导致任务排队不正确,或者在存在空闲线程时分叉另一个线程。volatile
getActiveCount()