在Spring Webflux中执行阻塞JDBC调用

我正在使用Spring Webflux和Spring data jpa,使用PostgreSql作为后端db。我不想在进行像和这样的db调用时阻塞主线程。为了实现同样的目标,我在课堂上有一个主调度程序和一个服务类。findsaveControllerjdbcScheduler

我定义它们的方式是:

@Configuration
@EnableJpaAuditing
public class CommonConfig {

    @Value("${spring.datasource.hikari.maximum-pool-size}")
    int connectionPoolSize;

    @Bean
    public Scheduler scheduler() {
        return Schedulers.parallel();
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

    @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
        return new TransactionTemplate(transactionManager);
    }
}

现在,在我的服务层中执行获取/保存调用时,我执行:

    @Override
    public Mono<Config> getConfigByKey(String key) {
        return Mono.defer(
            () -> Mono.justOrEmpty(configRepository.findByKey(key)))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
        return Flux
            .fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> addConfigs(List<Config> configList) {
        return Flux.fromIterable(configRepository.saveAll(configList))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

在控制器中,我做:

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
        return configService.addConfigs(configs).collectList()
            .map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
            .subscribeOn(scheduler);
    }

这是正确的吗?和/或有更好的方法来做到这一点?

我的理解是:

.subscribeOn(jdbcScheduler)
.publishOn(scheduler);

是该任务将在线程上运行,稍后的结果将发布在我的主并行上。这种理解是否正确?jdbcSchedulerscheduler


答案 1

您对 和 的理解是正确的(参见反应堆项目中关于这些运营商的参考文档)。publishOnsubscribeOn

如果您调用阻塞库而不调度在特定调度程序上工作,则这些调用将阻塞少数可用线程之一(默认情况下为 Netty 事件循环),并且您的应用程序只能同时为几个请求提供服务。

现在我不确定你这样做想要达到什么目的。

首先,并行调度程序是为CPU密集型任务而设计的,这意味着您将拥有很少的任务,与CPU内核一样多(或更多)。在这种情况下,这就像将线程池大小设置为常规 Servlet 容器上的内核数。你的应用将无法处理大量并发请求。

即使你选择了一个更好的替代方案(比如弹性调度程序),它仍然不如Netty事件循环那么好,Netty事件循环是Spring WebFlux中本地调度请求处理的地方。

如果您的最终目标是性能和可伸缩性,那么在反应式应用程序中包装阻塞调用的性能可能比常规的 Servlet 容器更差。

你可以改用 Spring MVC 和:

  • 在处理阻塞库(如 JPA)时,使用常用的阻塞返回类型
  • 在不绑定到此类库时使用和返回类型MonoFlux

这不会是非阻塞的,但这仍然是异步的,您将能够并行完成更多工作,而无需处理复杂性。


答案 2

恕我直言,有一种方法可以执行此操作,从而更好地利用机器中的资源。以下文档,您可以将调用包装在其他线程中,并使用此代码继续执行。


推荐