如何正确读取 Flux<DataBuffer>并将其转换为单个输入流

我正在为我的 spring-boot 应用程序使用和自定义类WebClientBodyExtractor

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })

身体提取器.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}

上面的代码适用于小有效载荷,但不适用于大有效载荷,我认为这是因为我只读取单个通量值,我不确定如何组合和读取所有值。nextdataBuffer

我是反应堆的新手,所以我不知道通量/单声道的很多技巧。


答案 1

这真的不像其他答案所暗示的那么复杂。

流式传输数据而不在内存中缓冲所有数据的唯一方法是使用管道,正如@jin-kwon所建议的那样。但是,通过使用Spring的BodyExtractorsDataBufferUtils实用程序类,可以非常简单地完成它。

例:

private InputStream readAsInputStream(String url) throws IOException {
    PipedOutputStream osPipe = new PipedOutputStream();
    PipedInputStream isPipe = new PipedInputStream(osPipe);

    ClientResponse response = webClient.get().uri(url)
        .accept(MediaType.APPLICATION.XML)
        .exchange()
        .block();
    final int statusCode = response.rawStatusCode();
    // check HTTP status code, can throw exception if needed
    // ....

    Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
        .doOnError(t -> {
            log.error("Error reading body.", t);
            // close pipe to force InputStream to error,
            // otherwise the returned InputStream will hang forever if an error occurs
            try(isPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        })
        .doFinally(s -> {
            try(osPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        });

    DataBufferUtils.write(body, osPipe)
        .subscribe(DataBufferUtils.releaseConsumer());

    return isPipe;
}

如果不关心检查响应代码或为失败状态代码引发异常,则可以使用block()ClientResponse

flatMap(r -> r.body(BodyExtractors.toDataBuffers()))

相反。


答案 2

Bk Santiago的答案的略微修改版本使用而不是.非常相似,但不需要额外的类:reduce()collect()

爪哇岛:

body.reduce(new InputStream() {
    public int read() { return -1; }
  }, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */

或 Kotlin:

body.reduce(object : InputStream() {
  override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
  .flatMap { inputStream -> /* do something with single InputStream */ }

与使用相比,这种方法的好处是,您不需要使用其他类来收集内容。collect()

我创建了一个新的空,但是如果该语法令人困惑,您也可以将其替换为,而是创建一个空作为初始值。InputStream()ByteArrayInputStream("".toByteArray())ByteArrayInputStream


推荐