为什么 ThreadPoolExecutor 将 BlockingQueue 作为其参数?

2022-09-01 20:44:21

我尝试创建和执行 ThreadPoolExecutor

int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

如果我尝试7日,8日...连续任务

  threadPool.execute(task);  

在队列达到最大大小
后,它开始抛出“RejectedExecutionException”。意味着我失去了添加这些任务。

那么,如果 BlockingQueue 缺少任务,它的作用是什么?意味着为什么它不等待?

从阻塞队列的定义

一个队列,它另外支持在检索元素时等待队列变为非空,并在存储元素时等待队列中空间变为可用的操作。


为什么我们不能选择linklist(正常的队列实现而不是阻塞队列)?


答案 1

出现此问题的原因是您的任务队列太小,这由 execute 方法的文档指示:

在将来的某个时间执行给定的任务。该任务可以在新线程或现有池线程中执行。如果由于此执行程序已关闭或已达到其容量而无法提交任务以执行该任务,则该任务将由当前 RejectedExecutionHandler 处理。

因此,第一个问题是您将队列大小设置为非常小的数字:

int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

然后你说“如果[我]尝试7th,8th...任务“,那么你会得到一个,因为你超过了队列的容量。有两种方法可以解决您的问题(我建议同时做这两种方法):RejectedExecutionException

  1. 增加队列的大小。
  2. 捕获异常并重新尝试添加任务。

你应该有这样的东西:

public void ExecuteTask(MyRunnableTask task) {
    bool taskAdded = false;
    while(!taskAdded) {
        try {
            executor.execute(task);
            taskAdded = true;
        } catch (RejectedExecutionException ex) {
            taskAdded = false;
        }
    }   
}

现在,为了解决您的其他问题...

那么,如果 BlockingQueue 缺少任务,它的作用是什么?

的作用是完成生产者/消费者模式,如果它足够大,那么你不应该看到你遇到的问题。正如我上面提到的,您需要增加队列大小并捕获异常,然后重试执行任务。BlockingQueue

为什么我们不能选择链接列表?

链表既不是线程安全的,也不是阻塞的。生产者/消费者模式往往最适合阻塞队列。

更新

请不要被以下陈述所冒犯,我故意使用更严格的语言,以强调这样一个事实,即你的第一个假设永远不应该是你正在使用的库有问题(除非你自己写了库,并且你知道它存在一个特定的问题)!

因此,让我们现在就解决这个问题:ThreadPoolExecutor和Java库都不是这里的问题。导致问题完全是您对库的(错误)使用。Javmex有一个很棒的教程,解释了你所看到的确切情况。

可能有几个原因导致您填满队列的速度快于清空队列的速度:

  1. 添加要执行的任务的线程添加它们的速度太快。
  2. 执行任务的时间太长。
  3. 您的队列太小。
  4. 上述3.

还有很多其他原因,但我认为以上是最常见的。

我会给你一个具有无限队列的简单解决方案,但它不会解决你对库的(错误)使用。因此,在我们责怪Java库之前,让我们看一个简洁的例子,它演示了你遇到的确切问题。

更新 2.0

以下是解决该特定问题的其他几个问题:

  1. 线程池执行器阻塞队列已满?
  2. 如果 ThreadPoolExecutor 的 submit() 方法被饱和,如何让它阻塞?

答案 2

阻塞队列主要用于使用者(池中的线程)。线程可以等待新任务在队列上可用,它们将自动被唤醒。一个普通的链表不能达到这个目的。

在生产者端,默认行为是在队列已满的情况下引发异常。这可以通过实现您自己的DempjectExceptionHandler轻松自定义。在处理程序中,您可以控制队列并调用 put 方法,该方法将阻塞,直到有更多空间可用。

但这不是一件好事 - 原因是如果此执行器中存在问题(死锁 ,处理缓慢),则会对系统的其余部分产生连锁反应。例如,如果您从 servlet 调用 execute 方法 - 如果执行方法阻塞,那么所有容器线程都将被搁置,并且您的应用程序将停止。这可能就是默认行为是引发异常而不是等待的原因。此外,也没有DenjectExceptionHandler的实现来做到这一点 - 以阻止人们使用它。

有一个选项(CallersRunPolicy)可以在调用线程中执行,如果您希望进行处理,它可以是另一个选项。

一般规则是 - 最好是处理一个请求失败,而不是使整个系统关闭。您可能希望了解断路器模式。