SQSListener with ThreadpoolExecutor

在下面的示例中,我将最大和核心池大小设置为 1。但是,正在处理任何消息。当我启用调试日志时,我能够看到从SQS中提取的消息,但我想它没有被处理/删除。然而,当我将核心和最大池大小增加到2时,消息似乎被处理了。

编辑

我相信Spring可能会为接收器分配一个线程,该线程从队列中读取数据,因此它无法将线程分配给正在处理消息的侦听器。当我将核心池大小增加到2时,我看到消息正在从队列中读取。当我添加另一个侦听器(用于死信队列)时,我遇到了同样的问题 - 2个线程不够,因为消息没有被处理。当我将核心池大小增加到3时,它开始处理消息。我假设在这种情况下,1个线程被分配从队列中读取消息,2个侦听器被分配1个线程。

@Configuration
public class SqsListenerConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "aws.configuration")
    public ClientConfiguration clientConfiguration() {
        return new ClientConfiguration();
    }


    @Bean
    @Primary
    public AWSCredentialsProvider awsCredentialsProvider() {

        ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
        try {
            credentialsProvider.getCredentials();
            System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
            System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());

        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                            "Please make sure that your credentials file is at the correct " +
                            "location (~/.aws/credentials), and is in valid format.",
                    e);
        }
        return credentialsProvider;
    }


    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard().
                withCredentials(awsCredentialsProvider()).
                withClientConfiguration(clientConfiguration()).
                build();
    }


    @Bean
    @ConfigurationProperties(prefix = "aws.queue")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
        simpleMessageListenerContainer.setMaxNumberOfMessages(10);
        simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
        return simpleMessageListenerContainer;
    }


    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
        QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
        return queueMessageHandler;
    }


    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setThreadNamePrefix("oaoQueueExecutor");
        executor.initialize();
        return executor;
    }


    @Bean
    public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
        return new QueueMessagingTemplate(amazonSQSAsync);
    }


}

侦听器配置

    @SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {

        System.out.println(" Data = " + serviceData + " MessageId = " + messageId);

        repository.execute(serviceData);
}

答案 1

通过设置和相同,您可以创建一个 .这里记录了对规则的很好的解释corePoolSizemaximumPoolSizefixed-size thread pool

隐式设置允许删除任务。但是,默认队列容量为 ,出于实际目的,它是无穷大的。maxPoolSizeInteger.MAX_VALUE

需要注意的是,它使用 underneath,它有一种有点不寻常的排队方法,在文档中进行了描述:ThreadPoolTaskExecutorThreadPoolExecutor

如果正在运行或多个线程,则执行程序始终首选对请求进行排队,而不是添加新线程。corePoolSize

这意味着只有当队列已满时才相关,否则线程数将永远不会超过 。例如,如果我们将从未完成的任务提交到线程池:maxPoolSizecorePoolSize

  • 第一个提交将开始一个新的线程;corePoolSize
  • 之后,所有提交都将进入队列;
  • 如果队列是有限的并且其容量已耗尽,则每次提交都会启动一个新线程,最多maxPoolSize;
  • 当池和队列都已满时,新提交将被拒绝。

排队 - 阅读文档

任何任务都可用于转移和保留已提交的任务。此队列的使用与池大小调整交互:BlockingQueue

  • 如果运行的线程少于 corePoolSize 线程,则执行程序始终倾向于添加新线程而不是排队。
  • 如果 corePoolSize 或更多线程正在运行,则执行程序始终倾向于对请求进行排队,而不是添加新线程。
  • 如果请求无法排队,则会创建一个新线程,除非该线程超过 maxPoolSize,在这种情况下,任务将被拒绝。

Unbounded queues.使用无界队列(例如,没有预定义容量的队列)将导致新任务在所有 corePoolSize 线程都忙的情况下排队。因此,将创建不超过线程。(因此,值没有任何影响。LinkedBlockingQueuecorePoolSizemaximumPoolSize

  1. 如果线程数小于 ,请创建一个新线程来运行新任务。corePoolSize
  2. 如果线程数等于(或大于),则将任务放入队列中。corePoolSize
  3. 如果队列已满,并且线程数小于 ,请创建一个新线程来运行任务。maxPoolSize
  4. 如果队列已满,并且线程数大于或等于 ,则拒绝该任务。maxPoolSize

答案 2

推荐