Java:执行器服务,在提交达到一定队列大小后阻止提交

我正在尝试编写一个解决方案,其中单个线程生成可以并行执行的 I/O 密集型任务。每个任务都有重要的内存中数据。因此,我希望能够限制当前挂起的任务数。

如果我像这样创建ThreadPoolExecutor:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

然后,当队列填满并且所有线程都已忙时,将引发。executor.submit(callable)RejectedExecutionException

当队列已满且所有线程都忙时,我该怎么做才能使阻塞?executor.submit(callable)

编辑:我试过这个

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

它在某种程度上实现了我想要达到的效果,但以一种不优雅的方式(基本上被拒绝的线程在调用线程中运行,因此这会阻止调用线程提交更多)。

编辑:(提出问题5年后)

对于阅读此问题及其答案的任何人,请不要将接受的答案视为正确的解决方案。请仔细阅读所有答案和评论。


答案 1

我也做过同样的事情。诀窍是创建一个BlocktingQueue,其中offer()方法实际上是一个put()。(您可以使用所需的任何基本BlocktingQueue impl)。

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

请注意,这仅适用于线程池,因此请小心(请参阅注释)。corePoolSize==maxPoolSize


答案 2

以下是我如何解决这个问题:

(注意:此解决方案确实会阻止提交可调用的线程,因此可以防止抛出 RejectedExecutionException)

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}

推荐