优雅地将队列长度指示器实现到执行器服务

为什么,哦,为什么不为其s提供队列长度指标?最近我发现自己在做这样的事情:java.util.concurrentExecutorService

ExecutorService queue = Executors.newSingleThreadExecutor();
AtomicInteger queueLength = new AtomicInteger();
...

public void addTaskToQueue(Runnable runnable) {
    if (queueLength.get() < MAX_QUEUE_LENGTH) {
        queueLength.incrementAndGet(); // Increment queue when submitting task.
        queue.submit(new Runnable() {
            public void run() {
                runnable.run();
                queueLength.decrementAndGet(); // Decrement queue when task done.
            }
        });
    } else {
        // Trigger error: too long queue
    }
}

这工作正常,但是...我认为这确实应该作为.随身携带一个与实际队列分开的计数器是愚蠢且容易出错的,计数器应该指示其长度(让我想起C数组)。但是,s 是通过静态工厂方法获得的,因此无法简单地扩展原本出色的单线程执行器并添加队列计数器。那么我该怎么办:ExecutorServiceExecutorService

  1. 重塑已经在JDK中实现的东西?
  2. 其他聪明的解决方案?

答案 1

还有一种更直接的方法:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();
// add jobs
// ...
int size = executor.getQueue().size();

虽然你可以考虑不要使用执行器的方便创建方法,而是直接创建执行器来摆脱强制转换,从而确保执行器将始终是一个,即使实现有一天会改变。ThreadPoolExecutorExecutors.newSingleThreadExecutor

ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );

这是从 JDK 1.6 中直接复制的。传递给构造函数的那个实际上是您将从 中获取的对象。Executors.newSingleThreadExecutorLinkedBlockingQueuegetQueue


答案 2

虽然您可以直接检查队列大小。处理队列太长的另一种方法是使内部队列有界。

public static
ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                              5000L, TimeUnit.MILLISECONDS,
                              new ArrayBlockingQueue<Runnable>(queueSize, true));
}

当您超过限制时,这将导致 RejectedExecutionExceptions(请参阅此内容)。

如果要避免异常,可以劫持调用线程以执行函数。有关解决方案,请参阅此 SO 问题

引用


推荐