创建动态(增长/收缩)线程池

2022-09-02 21:41:25

我需要在Java中实现一个线程池(java.util.concurrent),其线程数在空闲时处于某个最小值,当作业提交到其中的速度快于完成执行时,它会增长到上限(但永远不会进一步),并且当所有作业都完成并且没有提交更多作业时,它会缩小到下限。

您将如何实现类似的东西?我想这将是一个相当常见的使用场景,但显然工厂方法只能创建固定大小的池和池,当提交许多作业时,这些池和池会无限增长。该类提供了和参数,但它的文档似乎暗示,同时拥有多个线程的唯一方法是使用有界作业队列,在这种情况下,如果您已到达线程,则会遇到作业拒绝,您必须自己处理?我想出了这个:java.util.concurrent.ExecutorsThreadPoolExecutorcorePoolSizemaximumPoolSizecorePoolSizemaximumPoolSize

//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(minSize));
...

//submitting jobs
for (Runnable job : ...) {
    while (true) {
        try {
            pool.submit(job);
            System.out.println("Job " + job + ": submitted");
            break;
        } catch (RejectedExecutionException e) {
            // maxSize jobs executing concurrently atm.; re-submit new job after short wait
            System.out.println("Job " + job + ": rejected...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e1) {
            }
        }
    }
}

我是否忽略了什么?有没有更好的方法来做到这一点?此外,根据一个人的要求,上述代码至少在(我认为)作业完成之前不会完成可能会有问题。因此,如果您希望能够将任意数量的作业提交到池中并立即继续操作,而无需等待其中任何一个完成,那么如果没有一个专用的“作业求和”线程来管理所需的无界队列以保存所有提交的作业,我看不出如何做到这一点。AFAICS,如果您为 ThreadPoolExecutor 本身使用无界队列,则其线程计数永远不会超过 corePoolSize。(total number of jobs) - maxSize


答案 1

当增长和收缩与线程结合在一起时,我脑海中只有一个名字:来自java.util.concurrent package的CachedThreadPool

ExecutorService executor = Executors.newCachedThreadPool();

CachedThreadPool() 可以重用该线程,并在需要时创建新线程。是的,如果一个线程空闲60秒,CachedThreadPool将杀死它。所以这是非常轻量级的 - 用你的话说,增长和收缩!


答案 2

可能对您有帮助的一个技巧是分配一个使用相同线程将作业提交到阻塞队列中的方法。这将阻塞当前线程并消除对某种循环的需求。RejectedExecutionHandler

在这里看到我的答案:

如何让 ThreadPoolExecutor 命令在需要处理太多数据时等待?

下面是从该答案复制的拒绝处理程序。

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});

然后,只要您意识到在核心线程上方创建任何线程之前,您首先使用的有界阻塞队列就会填满,就应该能够使用核心/最大线程计数。因此,如果您有 10 个核心线程,并且希望第 11 个作业启动第 11 个线程,则很遗憾,您需要有一个大小为 0 的阻塞队列(可能是 a )。我觉得这在原本伟大的课程中是一个真正的限制。SynchronousQueueExecutorService