Spring ThreadPoolTaskExecutor 只运行一个线程

2022-09-01 19:41:13

我们在 JMS 使用者中使用 ThreadPoolExecutor,并将其注入 DefaultMessageListenerContainer。我希望这会为许多消息运行并发线程,但是我们的日志显示线程ID不会更改。我们的日志记录显示,对于不同的消息处理,线程 ID 在 24 处始终相同。

这是该场景中的弹簧配置:

<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"       
         p:connectionFactory-ref="cachedConnectionFactory"
         p:destination-ref="formsCRRDestination"
         p:messageListener-ref="formServicePojo"
         p:concurrentConsumers="5"
         p:idleTaskExecutionLimit="1"
         p:maxConcurrentConsumers="25"
         p:taskExecutor-ref="threadPoolExecutor"         
         destroy-method="doShutdown"     
    >   


 <bean id="threadPoolExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
        <property name="corePoolSize" value="1"/>
        <property name="maxPoolSize" value="15"/>
        <property name="keepAliveSeconds" value="30"/>
    </bean>

在没有将 threadPoolExectuor bean 注入 DefaultMessageListenerContainer 之后,消息现在正在不同的线程中执行。

这是生成的配置:

<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"       
             p:connectionFactory-ref="cachedConnectionFactory"
             p:destination-ref="formsCRRDestination"
             p:messageListener-ref="formServicePojo"
             p:concurrentConsumers="5"
             p:idleTaskExecutionLimit="1"
             p:maxConcurrentConsumers="25"       
             destroy-method="doShutdown"     
        >   

我尝试过阅读文档,但我不明白为什么会发生这种情况。有什么解释吗?


答案 1

试试这个:

<bean id="threadPoolTaskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="10" />
        <property name="maxPoolSize" value="25" />
        <property name="queueCapacity" value="30" />
</bean>
  • 这将在初始化时创建 10 个线程。
  • 如果所有 10 个线程都处于繁忙状态,并且出现新任务,则它将使任务保持在队列中。
  • 如果队列已满,它将创建第 11 个线程,并将一直运行到 25。
  • 然后将抛出任务已拒绝的异常。

答案 2

我认为选择的答案是错误的。IIRC,ThreadPoolTaskExecutor(最终在JDK中为ThreadPoolExecutor)的工作方式是

  1. ThreadPoolTaskExecutor 在启动时创建线程以提升 corePoolSize。
  2. 它需要请求到corePoolSize,并让线程处理任务。
  3. 如果在所有线程都忙时有更多的请求传入,则 ThreadPoolTaskExecutor 将开始将这些请求排队到内部队列中。这可能会有问题,因为如果不指定队列队列容量,则此队列大小将为 Integer.MAX_VALUE 作为默认值。
  4. 当池中存在任何可用线程时,#3 中添加的请求将由线程执行。
  5. 如果请求不断涌入,并且所有线程都忙并且队列已满,ThreadPoolTaskExecutor将开始创建新线程,直到maxPoolSize来处理请求。
  6. 如果请求超过这些(增加的线程数+队列大小),则任务将被拒绝或遵循您指定的策略。

所以我认为这里的问题是,要么1)你的消费者足够快,要么2)你堆叠请求太慢,所以你用corePoolSize指定的一个线程足以处理新的传入请求+排队任务,而不允许ThreadPoolTaskExecutor创建新线程。我很确定,如果你更用力地推动它,或者用小数字(如5~10)设置队列的容量,你将能够看到线程的数量在增加。


推荐