WebClient - 如何获取请求正文?

我已经开始使用WebClient,并且我正在使用过滤器方法添加请求/响应的日志记录:

WebClient.builder()
    .baseUrl(properties.getEndpoint())
    .filter((request, next) -> {
        // logging
        request.body()
    })
    .build();

我能够访问url,http方法,标头,但我在获取原始请求正文作为请求返回方法(.body()BodyInserterBodyInserter<?, ? super ClientHttpRequest> body()

如何转换为请求正文的表示形式?或者,如何正确记录整个请求/响应,同时能够在其中散列潜在的凭据?BodyInserterString


答案 1

您可以围绕 JSON 编码器创建自己的包装器/代理类,并在序列化正文发送到内部管之前截获序列化正文。

这篇博客文章展示了如何记录 WebClient 请求和响应的 JSON 有效负载

具体来说,您将扩展 的方法(或者在流数据的情况下)。然后,您可以根据需要对这些数据进行操作,例如日志记录等。您甚至可以根据环境/配置文件有条件地执行此操作encodeValueencodeValuesJackson2JsonEncoder

此自定义日志记录编码器可以在创建 编解码器时指定:WebClient

 CustomBodyLoggingEncoder bodyLoggingEncoder = new CustomBodyLoggingEncoder();
 WebClient.builder()
          .codecs(clientDefaultCodecsConfigurer -> {
             clientDefaultCodecsConfigurer.defaultCodecs().jackson2JsonEncoder(bodyLoggingEncoder);
             clientDefaultCodecsConfigurer.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(new ObjectMapper(), MediaType.APPLICATION_JSON));
          })
          ...

更新2020/7/3:

下面是一个匆忙的例子,应用相同的原理,但对于解码器:

public class LoggingJsonDecoder extends Jackson2JsonDecoder {
    private final Consumer<byte[]> payloadConsumer;

    public LoggingJsonEncoder(final Consumer<byte[]> payloadConsumer) {
        this.payloadConsumer = payloadConsumer;
    }

    @Override
    public Mono<Object> decodeToMono(final Publisher<DataBuffer> input, final ResolvableType elementType, final MimeType mimeType, final Map<String, Object> hints) {
        // Buffer for bytes from each published DataBuffer
        final ByteArrayOutputStream payload = new ByteArrayOutputStream();

        // Augment the Flux, and intercept each group of bytes buffered
        final Flux<DataBuffer> interceptor = Flux.from(input)
                                                 .doOnNext(buffer -> bufferBytes(payload, buffer))
                                                 .doOnComplete(() -> payloadConsumer.accept(payload.toByteArray()));

        // Return the original method, giving our augmented Publisher
        return super.decodeToMono(interceptor, elementType, mimeType, hints);
    }

    private void bufferBytes(final ByteArrayOutputStream bao, final DataBuffer buffer) {
        try {
            bao.write(ByteUtils.extractBytesAndReset(buffer));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

您可以使用 上的生成器方法将其与编码器一起配置。当然,上述方法仅在假设您的数据被反序列化为 Mono 时才有效。但是,如果需要,请覆盖其他方法。另外,我只是在那里标准地输出生成的JSON,但是您可以传入一个或一些东西,以便解码器将字符串发送到例如,或者只是从那里记录;轮到你了。codecsWebClientConsumer<String>

一个警告,在它当前的形式下,这将使你的内存使用量翻倍,因为它正在缓冲整个响应。如果您可以将该字节数据发送到另一个进程/线程以立即写入日志文件或某些输出流(甚至 Flux),则可以避免缓冲内存中的整个有效负载。


答案 2

尝试了所有答案,但其中一些不符合我的需求或只是不起作用。根据这个答案编写了我自己的解决方案,以拦截请求/响应正文并记录它们。

@Slf4j
@Component
public class LoggingCustomizer implements WebClientCustomizer {

    @Override public void customize(WebClient.Builder webClientBuilder) {
        webClientBuilder.filter((request, next) -> {
            logRequest(request);
            return next
                .exchange(interceptBody(request))
                .doOnNext(this::logResponse)
                .map(this::interceptBody);
        });
    }

    private ClientRequest interceptBody(ClientRequest request) {
        return ClientRequest.from(request)
            .body((outputMessage, context) -> request.body().insert(new ClientHttpRequestDecorator(outputMessage) {
                @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                    return super.writeWith(Mono.from(body)
                        .doOnNext(dataBuffer -> logRequestBody(dataBuffer)));
                }
            }, context))
            .build();
    }

    private ClientResponse interceptBody(ClientResponse response) {
        return response.mutate()
            .body(data -> data.doOnNext(this::logResponseBody))
            .build();
    }

    private void logRequest(ClientRequest request) {
        log.debug("DOWNSTREAM REQUEST: METHOD {}, URI: {}, HEADERS: {}", request.method(), request.url(), request.headers());
    }

    private void logRequestBody(DataBuffer dataBuffer) {
        log.debug("DOWNSTREAM REQUEST: BODY: {}", dataBuffer.toString(StandardCharsets.UTF_8));
    }

    private void logResponse(ClientResponse response) {
        log.debug("DOWNSTREAM RESPONSE: STATUS: {}, HEADERS: {}", response.rawStatusCode(), response.headers().asHttpHeaders());
    }

    private void logResponseBody(DataBuffer dataBuffer) {
        log.debug("DOWNSTREAM RESPONSE: BODY: {}", dataBuffer.toString(StandardCharsets.UTF_8));
    }

}

更新:添加了用于记录的代码段(首选解决方案)reactor.netty.http.client.HttpClient

@Slf4j
@Component
public class LoggingCustomizer implements WebClientCustomizer {

    @Override public void customize(WebClient.Builder webClientBuilder) {
        HttpClient httpClient = HttpClient.create()
            .doOnRequest((httpClientRequest, connection) -> connection.addHandlerFirst(new LoggingHandler()));
        webClientBuilder.clientConnector(new ReactorClientHttpConnector(httpClient));
    }

    private static class LoggingHandler extends ChannelDuplexHandler {

        @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (msg instanceof FullHttpRequest request) {
                log.debug("DOWNSTREAM  REQUEST: METHOD: {}, URI: {}, BODY: {}, HEADERS: {}",
                    request.method(), request.uri(), request.content().toString(defaultCharset()), request.headers());
            } else if (msg instanceof HttpRequest request) {
                log.debug("DOWNSTREAM  REQUEST: METHOD: {}, URI: {}, HEADERS: {}",
                    request.method(), request.uri(), request.headers());
            } else if (msg instanceof FullHttpMessage message) {
                log.debug("DOWNSTREAM  REQUEST: BODY: {}",
                    message.content().toString(defaultCharset()));
            }
            super.write(ctx, msg, promise);
        }

        @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof FullHttpResponse response) {
                log.debug("DOWNSTREAM RESPONSE: STATUS: {}, BODY: {}, HEADERS: {}",
                    response.status().code(), response.content().toString(defaultCharset()), response.headers());
            } else if (msg instanceof HttpResponse response) {
                log.debug("DOWNSTREAM RESPONSE: STATUS: {}, HEADERS: {}",
                    response.status().code(), response.headers());
            } else if (!(msg instanceof LastHttpContent) && msg instanceof HttpContent httpContent) {
                log.debug("DOWNSTREAM RESPONSE: BODY: {}",
                    httpContent.content().toString(defaultCharset()));
            }
            super.channelRead(ctx, msg);
        }
    }

}

推荐