block()/blockFirst()/blockLast() 在 exchange() 之后调用 bodyToMono 时出现阻塞错误

我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,api将返回成功,但是在生成文件而不是文件本身时,DTO会详细说明错误。这是使用一个非常古老且设计不佳的api,因此请原谅使用post和api设计。

来自 api 调用 (exchange()) 的响应是 ClientResponse。从这里,我可以使用bodyToMono转换为ByteArrayResource,它可以流式传输到文件,或者,如果在创建文件时出错,那么我也可以使用bodyToMono转换为DTO。但是,我似乎不能做任何事情,也不能根据客户端响应标头的内容。

在运行时,我得到一个非法状态异常由

block()/blockFirst()/blockLast() 是阻塞的,这在线程 reactor-http-client-epoll-12 中不受支持

我认为我的问题是我不能在同一个函数链中调用block()两次。

我的代码片段是这样的:

webClient.post()
        .uri(uriBuilder -> uriBuilder.path("/file/")
                                      .queryParams(params).build())
        .exchange()
        .doOnSuccess(cr -> {
                if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
                    NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
                    createErrorFile(dto);
                }
                else {
                    ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
                    createSpreadsheet(bAr);
                }
            }
        )
        .block();

基本上,我想根据标头中定义的MediaType以不同的方式处理客户端响应。

这可能吗?


答案 1

首先,有几件事可以帮助您理解解决此用例的代码片段。

  1. 永远不要在返回反应式类型的方法中调用阻塞方法;您将阻止应用程序的少数线程之一,这对应用程序非常不利
  2. 无论如何,从 Reactor 3.2 开始,在反应式管道内阻塞会引发错误
  3. 正如评论中建议的那样,打电话也不是一个好主意。这或多或少就像在单独的线程中将该作业作为任务启动。完成后,您将获得一个回调(可以给定这些方法),但实际上您正在将当前管道与该任务解耦。在这种情况下,可以先关闭客户端 HTTP 响应并清理资源,然后再有机会读取完整的响应正文以将其写入文件。subscribesubscribe
  4. 如果您不想在内存中缓冲整个响应,Spring提供了(想想可以池化的ByteBuffer实例)。DataBuffer
  5. 如果您正在实现的方法本身是阻塞(例如返回),则可以调用block,例如在测试用例中。void

下面是一个可用于执行此操作的代码片段:

Mono<Void> fileWritten = WebClient.create().post()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .exchange()
        .flatMap(response -> {
            if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
                Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
                return createErrorFile(dto);
            }
            else {
                Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
                return createSpreadsheet(body);
            }
        });
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation

如您所见,我们没有阻止任何地方,并且处理I / O的方法正在返回,这是反应式的回调,当事情完成以及发生错误时发出信号。Mono<Void>done(error)

由于我不确定该方法应该做什么,因此我提供了一个示例,该示例仅将正文字节写入文件。请注意,由于数据缓冲区可能会被回收/池化,因此我们需要在完成后释放它们。createErrorFilecreateSpreadsheet

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
    try {
        Path file = //...
        WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
        return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
    } catch (IOException exc) {
        return Mono.error(exc);
    }
}

通过此实现,您的应用程序将在给定时间将几个实例保存在内存中(反应式运算符出于性能原因预取值),并将在字节以反应式方式出现时写入字节。DataBuffer


答案 2

[2021/10/19更新]

toProcessor()现已弃用。

考虑使用

myMono.toFuture().get();

正如投票最多的答案所述,人们永远不应该阻止。在我的情况下,这是唯一的选择,因为我们在命令式代码段中使用反应式库。阻塞可以通过将单声道包装在处理器中来完成:

myMono.toProcessor().block()

推荐