是否建议将任务添加到 ThreadPoolExecutor 的 BlockingQueue?

用于 ThreadPoolExecutor 的 JavaDoc 不清楚是否可以接受将任务直接添加到支持执行器中。文档说,调用“主要用于调试和监视”。BlockingQueueexecutor.getQueue()

我正在用我自己的.我保留了对队列的引用,以便可以直接向其添加任务。返回相同的队列,因此我假设 中的告诫适用于通过我的手段获取的支持队列的引用。ThreadPoolExecutorBlockingQueuegetQueue()getQueue()

代码的一般模式是:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer()executor.execute()

据我所知,典型的用法是通过 添加任务。我上面示例中的方法具有在队列上阻止的好处,而如果队列已满并拒绝我的任务,则会立即失败。我也喜欢提交作业与阻塞队列交互;这对我来说感觉更“纯粹”的生产者 - 消费者。executor.execute()execute()

直接向队列添加任务的含义:我必须调用,否则没有工作线程正在运行。假设没有与执行器进行其他交互,则不会监视队列(对源的检查证实了这一点)。这也意味着对于直接排队,必须另外为> 0 个核心线程配置,并且不得配置为允许核心线程超时。prestartAllCoreThreads()ThreadPoolExecutorThreadPoolExecutor

tl;博士

给定如下配置:ThreadPoolExecutor

  • 核心线程 > 0
  • 不允许核心线程超时
  • 核心线程已预启动
  • 保持对支持执行程序的引用BlockingQueue

是否可以将任务直接添加到队列而不是调用?executor.execute()

相关

这个问题(生产者/消费者工作队列)是相似的,但没有具体涵盖直接添加到队列中。


答案 1

一个诀窍是实现 ArrayBlockingQueue 的自定义子类,并重写 offer() 方法来调用你的阻塞版本,然后你仍然可以使用正常的代码路径。

queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};

(正如您可能猜到的那样,我认为直接在队列上调用offer作为您的正常代码路径可能是一个坏主意)。


答案 2

如果是我,我宁愿使用而不是,仅仅是因为我已经在使用其他所有内容。Executor#execute()Queue#offer()java.util.concurrent

你的问题很好,它激起了我的兴趣,所以我看了一下来源:ThreadPoolExecutor#execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

我们可以看到,在工作队列上执行自身调用,但在必要时进行一些漂亮,美味的池操作之前不会。出于这个原因,我认为建议使用 ;不使用它可能会(尽管我不确定)导致游泳池以非最佳方式运行。但是,我不认为使用会破坏执行器 - 看起来任务是使用以下方法从队列中提取的(也来自ThreadPoolExecutor):offer()execute()offer()

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

此方法只是从循环中调用的,因此,如果执行器未关闭,它将阻塞,直到将新任务提供给队列(无论它来自何处)。getTask()

注意:即使我在这里发布了源代码的代码片段,我们也不能依靠它们来获得明确的答案 - 我们应该只编码到API。我们不知道 的实施将如何随时间变化。execute()


推荐