动态调整 java.util.concurrent.ThreadPoolExecutor 在等待任务时的大小

2022-09-02 11:34:19

我正在与 一起并行处理多个项目。尽管线程本身工作正常,但有时由于线程中发生的操作,我们会遇到其他资源限制,这使我们想要调低池中的线程数。java.util.concurrent.ThreadPoolExecutor

我想知道是否有办法在线程实际工作时调低线程的数量。我知道您可以调用和/或,但这些只是在线程空闲时调整池的大小,但是在队列中没有等待任务之前,它们不会变得空闲。setMaximumPoolSize()setCorePoolSize()


答案 1

你绝对可以。调用将更改池的核心大小。对此方法的调用是线程安全的,并重写提供给 构造函数的设置。如果要修剪池大小,则剩余的线程将在当前作业队列完成后关闭(如果它们处于空闲状态,它们将立即关闭)。如果要增加池大小,将尽快分配新线程。分配新线程的时间范围是未记录的 — 但在实现中,每次调用该方法时都会执行新线程的分配。setCorePoolSize(int)ThreadPoolExecutorexecute

要将其与运行时可调整的作业场配对,可以将此属性(通过包装器或使用动态 MBean 导出器)公开为读写 JMX 属性,以创建一个相当好的、动态可调的批处理处理器。

若要在运行时(这是您的请求)强制减小池大小,必须对 方法进行子类化并添加中断。中断线程并不是一种充分的中断,因为它只与等待状态交互,并且在处理过程中,任务线程不会进入可中断状态。ThreadPoolExecutorbeforeExecute(Thread,Runnable)ThreadPoolExecutor

我最近遇到了同样的问题,试图让线程池在执行所有提交的任务之前强制终止。为了实现这一点,我通过在将线程替换为需要我的特定异常并丢弃它的线程后,才通过引发运行时异常来中断线程。UncaughtExceptionHandler

/**
 * A runtime exception used to prematurely terminate threads in this pool.
 */
static class ShutdownException
extends RuntimeException {
    ShutdownException (String message) {
        super(message);
    }
}

/**
 * This uncaught exception handler is used only as threads are entered into
 * their shutdown state.
 */
static class ShutdownHandler 
implements UncaughtExceptionHandler {
    private UncaughtExceptionHandler handler;

    /**
     * Create a new shutdown handler.
     *
     * @param handler The original handler to deligate non-shutdown
     * exceptions to.
     */
    ShutdownHandler (UncaughtExceptionHandler handler) {
        this.handler = handler;
    }
    /**
     * Quietly ignore {@link ShutdownException}.
     * <p>
     * Do nothing if this is a ShutdownException, this is just to prevent
     * logging an uncaught exception which is expected.  Otherwise forward
     * it to the thread group handler (which may hand it off to the default
     * uncaught exception handler).
     * </p>
     */
    public void uncaughtException (Thread thread, Throwable throwable) {
        if (!(throwable instanceof ShutdownException)) {
            /* Use the original exception handler if one is available,
             * otherwise use the group exception handler.
             */
            if (handler != null) {
                handler.uncaughtException(thread, throwable);
            }
        }
    }
}
/**
 * Configure the given job as a spring bean.
 *
 * <p>Given a runnable task, configure it as a prototype spring bean,
 * injecting any necessary dependencices.</p>
 *
 * @param thread The thread the task will be executed in.
 * @param job The job to configure.
 *
 * @throws IllegalStateException if any error occurs.
 */
protected void beforeExecute (final Thread thread, final Runnable job) {
    /* If we're in shutdown, it's because spring is in singleton shutdown
     * mode.  This means we must not attempt to configure the bean, but
     * rather we must exit immediately (prematurely, even).
     */
    if (!this.isShutdown()) {
        if (factory == null) {
            throw new IllegalStateException(
                "This class must be instantiated by spring"
                );
        }

        factory.configureBean(job, job.getClass().getName());
    }
    else {
        /* If we are in shutdown mode, replace the job on the queue so the
         * next process will see it and it won't get dropped.  Further,
         * interrupt this thread so it will no longer process jobs.  This
         * deviates from the existing behavior of shutdown().
         */
        workQueue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

在您的例子中,您可能希望创建一个没有允许的信号量(仅用于用作线程安全计数器),并且在关闭线程时,向它释放许多与先前核心池大小和新池大小的增量相对应的允许(需要重写该方法)。这将允许您在线程当前任务完成后终止线程。setCorePoolSize(int)

private Semaphore terminations = new Semaphore(0);

protected void beforeExecute (final Thread thread, final Runnable job) {
    if (terminations.tryAcquire()) {
        /* Replace this item in the queue so it may be executed by another
         * thread
         */
        queue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

public void setCorePoolSize (final int size) {
    int delta = getActiveCount() - size;

    super.setCorePoolSize(size);

    if (delta > 0) {
        terminations.release(delta);
    }
}

这应该会中断 f(n) = 活动 - 请求的 n 个线程。如果存在任何问题,s 分配策略是相当持久的。它使用保证执行的块来保持过早终止。因此,即使您终止了太多线程,它们也会重新填充。ThreadPoolExecutorfinally


答案 2

据我所知,这不可能以一种漂亮干净的方式进行。

您可以实现 beforeExecute 方法来检查某些布尔值并强制线程暂时停止。请记住,它们将包含一个任务,在重新启用之前不会执行该任务。

或者,您可以实现 afterExecute 以在饱和时引发 RuntimeException。这将有效地导致线程死亡,并且由于执行器将高于最大值,因此不会创建新的执行器。

我也不建议你这样做。相反,请尝试找到一些其他方法来控制并发执行导致您问题的任务。可能是通过在单独的线程池中执行它们,其工作线程数量更有限。


推荐