执行器服务,避免任务队列过满的标准方法

2022-09-01 06:01:14

我正在使用并发多线程程序的便利性。取以下代码:ExecutorService

while(xxx) {
    ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
    ...  
    Future<..> ... = exService.submit(..);
    ...
}

在我的情况下,问题是如果所有人都被占用,就不会阻塞。结果是 Task 队列被许多任务淹没。这样做的结果是,关闭执行服务需要很长时间(在很长一段时间内都是错误的)。原因是任务队列仍然很满。submit()NUMBER_THREADSExecutorService.shutdown()ExecutorService.isTerminated()

现在,我的解决方法是使用信号量来禁止在任务队列中有许多条目:ExecutorService

...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);

while(xxx) {
    ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
    ...
    semaphore.aquire();  
    // internally the task calls a finish callback, which invokes semaphore.release()
    // -> now another task is added to queue
    Future<..> ... = exService.submit(..); 
    ...
}

我确信有更好的更封装的解决方案?


答案 1

诀窍是使用固定的队列大小,并且:

new ThreadPoolExecutor.CallerRunsPolicy()

我还建议使用番石榴的ListingExecutorService。下面是一个使用者/生产者队列示例。

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  5000L, TimeUnit.MILLISECONDS,
                                  new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}

任何更好的东西,你可能想要考虑像RabbitMQ或ActiveMQ这样的MQ,因为它们具有QoS技术。


答案 2

您可以致电了解等待队列的大小。如果队列太长,您可以执行操作。我建议在当前线程中运行任务,如果队列太长而无法减慢生产者的速度(如果合适)。ThreadPoolExecutor.getQueue().size()


推荐