SQSListener with ThreadpoolExecutor
在下面的示例中,我将最大和核心池大小设置为 1。但是,正在处理任何消息。当我启用调试日志时,我能够看到从SQS中提取的消息,但我想它没有被处理/删除。然而,当我将核心和最大池大小增加到2时,消息似乎被处理了。
编辑
我相信Spring可能会为接收器分配一个线程,该线程从队列中读取数据,因此它无法将线程分配给正在处理消息的侦听器。当我将核心池大小增加到2时,我看到消息正在从队列中读取。当我添加另一个侦听器(用于死信队列)时,我遇到了同样的问题 - 2个线程不够,因为消息没有被处理。当我将核心池大小增加到3时,它开始处理消息。我假设在这种情况下,1个线程被分配从队列中读取消息,2个侦听器被分配1个线程。
@Configuration
public class SqsListenerConfiguration {
@Bean
@ConfigurationProperties(prefix = "aws.configuration")
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
try {
credentialsProvider.getCredentials();
System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e);
}
return credentialsProvider;
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard().
withCredentials(awsCredentialsProvider()).
withClientConfiguration(clientConfiguration()).
build();
}
@Bean
@ConfigurationProperties(prefix = "aws.queue")
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("oaoQueueExecutor");
executor.initialize();
return executor;
}
@Bean
public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}
}
侦听器配置
@SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
System.out.println(" Data = " + serviceData + " MessageId = " + messageId);
repository.execute(serviceData);
}