WebClient

返回 Spring Boot 基础

WebClient 是 Spring WebFlux 提供的非阻塞响应式 HTTP 客户端,基于 Reactor Netty,返回 Mono / Flux(详见 响应式编程)。Spring Boot 3.2+ 同步场景推荐 RestClient,旧代码迁移参考 RestTemplate


依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

MVC 项目也可单独引入 spring-webflux,不必切换整个技术栈。


构建实例

@Configuration
public class WebClientConfig {
 
    @Bean
    public WebClient webClient() {
        // 连接池配置(Reactor Netty)
        ConnectionProvider provider = ConnectionProvider.builder("custom")
            .maxConnections(200)
            .maxIdleTime(Duration.ofSeconds(30))
            .maxLifeTime(Duration.ofMinutes(5))
            .pendingAcquireTimeout(Duration.ofSeconds(5))
            .build();
 
        HttpClient httpClient = HttpClient.create(provider)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
            .responseTimeout(Duration.ofSeconds(10))
            .doOnConnected(conn ->
                conn.addHandlerLast(new ReadTimeoutHandler(10))
                    .addHandlerLast(new WriteTimeoutHandler(5)));
 
        return WebClient.builder()
            .baseUrl("https://api.example.com")
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .codecs(config -> config.defaultCodecs()
                .maxInMemorySize(10 * 1024 * 1024))  // 响应体缓冲上限 10MB
            .build();
    }
}

基本请求

GET — 单个对象

@Service
@RequiredArgsConstructor
public class UserService {
 
    private final WebClient webClient;
 
    public Mono<User> getById(Long id) {
        return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .bodyToMono(User.class);
    }
 
    // 泛型列表
    public Flux<User> listAll() {
        return webClient.get()
            .uri("/users")
            .retrieve()
            .bodyToFlux(User.class);
    }
 
    // 带查询参数
    public Flux<User> search(String keyword, int page, int size) {
        return webClient.get()
            .uri(uriBuilder -> uriBuilder
                .path("/users/search")
                .queryParam("keyword", keyword)
                .queryParam("page", page)
                .queryParam("size", size)
                .build())
            .retrieve()
            .bodyToFlux(User.class);
    }
}

POST

public Mono<Order> createOrder(CreateOrderRequest request) {
    return webClient.post()
        .uri("/orders")
        .contentType(MediaType.APPLICATION_JSON)
        .bodyValue(request)
        .retrieve()
        .bodyToMono(Order.class);
}

PUT / PATCH / DELETE

public Mono<User> updateUser(Long id, User user) {
    return webClient.put()
        .uri("/users/{id}", id)
        .bodyValue(user)
        .retrieve()
        .bodyToMono(User.class);
}
 
public Mono<Void> deleteUser(Long id) {
    return webClient.delete()
        .uri("/users/{id}", id)
        .retrieve()
        .bodyToMono(Void.class);
}

获取完整响应(含状态码和 Header)

public Mono<ResponseEntity<User>> getWithMeta(Long id) {
    return webClient.get()
        .uri("/users/{id}", id)
        .retrieve()
        .toEntity(User.class);
}

错误处理

retrieve() 在 4xx/5xx 时自动抛出 WebClientResponseException,可用 onStatus 自定义:

public Mono<User> getById(Long id) {
    return webClient.get()
        .uri("/users/{id}", id)
        .retrieve()
        .onStatus(HttpStatusCode::is4xxClientError, response ->
            response.bodyToMono(ErrorResponse.class)
                .map(err -> new BusinessException(err.getMessage()))
        )
        .onStatus(HttpStatusCode::is5xxServerError, response ->
            Mono.error(new ServiceUnavailableException("下游服务不可用"))
        )
        .bodyToMono(User.class);
}

全局错误处理(Filter 方式见下文):

// 捕获异常并提供降级值
webClient.get()
    .uri("/users/{id}", id)
    .retrieve()
    .bodyToMono(User.class)
    .onErrorReturn(WebClientResponseException.NotFound.class, User.empty())
    .onErrorMap(WebClientResponseException.class, e ->
        new ServiceException("调用用户服务失败: " + e.getStatusCode()));

Filter(过滤器)

相当于 RestTemplateClientHttpRequestInterceptor,可统一处理认证、日志、限流:

// 认证 Filter
ExchangeFilterFunction authFilter = ExchangeFilterFunction.ofRequestProcessor(request ->
    Mono.just(ClientRequest.from(request)
        .header(HttpHeaders.AUTHORIZATION, "Bearer " + tokenProvider.getToken())
        .build())
);
 
// 日志 Filter
ExchangeFilterFunction logFilter = ExchangeFilterFunction.ofRequestProcessor(request -> {
    log.info("[HTTP] {} {}", request.method(), request.url());
    return Mono.just(request);
});
 
// 响应日志(含状态码)
ExchangeFilterFunction responseLogFilter = (request, next) ->
    next.exchange(request)
        .doOnNext(response ->
            log.info("[HTTP] {} {} → {}", request.method(),
                     request.url(), response.statusCode()));
 
WebClient client = WebClient.builder()
    .filter(logFilter)
    .filter(authFilter)
    .filter(responseLogFilter)
    .build();

基于 Filter 的重试

ExchangeFilterFunction retryFilter = (request, next) ->
    next.exchange(request)
        .flatMap(response -> {
            if (response.statusCode().is5xxServerError()) {
                return Mono.error(new RetryableException());
            }
            return Mono.just(response);
        })
        .retryWhen(Retry.backoff(3, Duration.ofMillis(500))
            .filter(e -> e instanceof RetryableException));

重试与超时

public Mono<User> getByIdWithRetry(Long id) {
    return webClient.get()
        .uri("/users/{id}", id)
        .retrieve()
        .bodyToMono(User.class)
        // 超时(覆盖全局配置)
        .timeout(Duration.ofSeconds(5))
        // 重试策略:指数退避,最多 3 次,只重试特定异常
        .retryWhen(Retry.backoff(3, Duration.ofMillis(300))
            .maxBackoff(Duration.ofSeconds(3))
            .filter(e -> e instanceof WebClientRequestException)
            .onRetryExhaustedThrow((spec, signal) ->
                new ServiceException("重试耗尽: " + signal.failure().getMessage())))
        // 降级
        .onErrorReturn(User.empty());
}

exchange — 低级访问

需要直接操作响应体字节流或自定义解析时使用(必须手动消费响应体,否则连接不会释放):

public Mono<String> getRaw(String path) {
    return webClient.get()
        .uri(path)
        .exchangeToMono(response -> {
            if (response.statusCode().is2xxSuccessful()) {
                return response.bodyToMono(String.class);
            }
            // 必须消费响应体
            return response.createError();
        });
}

请求体高级用法

BodyInserters — 表单 / 文件上传

// 表单提交
public Mono<String> submitForm(String name, String email) {
    MultiValueMap<String, String> form = new LinkedMultiValueMap<>();
    form.add("name", name);
    form.add("email", email);
 
    return webClient.post()
        .uri("/form")
        .contentType(MediaType.APPLICATION_FORM_URLENCODED)
        .body(BodyInserters.fromFormData(form))
        .retrieve()
        .bodyToMono(String.class);
}
 
// 文件上传(Multipart)
public Mono<String> uploadFile(String filename, byte[] content) {
    MultipartBodyBuilder builder = new MultipartBodyBuilder();
    builder.part("file", content)
           .header("Content-Disposition",
               "form-data; name=\"file\"; filename=\"" + filename + "\"");
    builder.part("name", filename);
 
    return webClient.post()
        .uri("/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(builder.build()))
        .retrieve()
        .bodyToMono(String.class);
}

流式请求体(大文件)

public Mono<Void> streamUpload(Flux<DataBuffer> dataStream) {
    return webClient.post()
        .uri("/stream-upload")
        .body(dataStream, DataBuffer.class)
        .retrieve()
        .bodyToMono(Void.class);
}

接收 SSE 流

public Flux<ServerSentEvent<String>> subscribeToEvents(String clientId) {
    ParameterizedTypeReference<ServerSentEvent<String>> type =
        new ParameterizedTypeReference<>() {};
 
    return webClient.get()
        .uri("/sse/subscribe/{clientId}", clientId)
        .accept(MediaType.TEXT_EVENT_STREAM)
        .retrieve()
        .bodyToFlux(type);
}

SSE服务端推送 讲解服务端实现,WebClient 是对应的响应式客户端。


在 MVC 项目中阻塞调用

WebFlux 响应式流可通过 .block() 在命令式代码中同步等待(不建议在 Reactor 线程上调用,会死锁):

// 仅在非响应式上下文(如 MVC Controller、Service)中使用
User user = webClient.get()
    .uri("/users/{id}", id)
    .retrieve()
    .bodyToMono(User.class)
    .block(Duration.ofSeconds(5));

MVC 项目优先考虑 RestClient(同步,语义清晰),保留 WebClient 给真正需要响应式流的场景。


多实例管理

不同目标服务建议各自维护一个 WebClient 实例(共享底层连接池,独立 baseUrl 和默认 Header):

@Bean("paymentClient")
public WebClient paymentWebClient(WebClient.Builder builder) {
    return builder.baseUrl("https://payment.internal").build();
}
 
@Bean("inventoryClient")
public WebClient inventoryWebClient(WebClient.Builder builder) {
    return builder.baseUrl("https://inventory.internal").build();
}

Spring Boot 自动注入 WebClient.Builder(已预配置 CodecConfigurer 等),推荐通过它创建实例。


WebClient vs RestTemplate vs RestClient

特性WebClientRestClientRestTemplate
编程模型响应式(Mono/Flux)同步阻塞同步阻塞
性能高(非阻塞 I/O)中(阻塞 I/O)中(阻塞 I/O)
流式支持原生 Flux不支持不支持
API 风格流式链式流式链式方法重载
适用版本Boot 2.x+Boot 3.2+所有版本
推荐状态推荐(响应式)推荐(同步)维护模式

相关链接