响应式编程

响应式编程基于异步数据流,通过非阻塞的方式处理数据,适合高并发 I/O 密集型场景。Spring 生态的响应式实现基于 Project Reactor,核心类型是 MonoFlux

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Mono 与 Flux

Mono<T>Flux<T>
元素数量0 或 1 个0 到 N 个
类比Optional / 单个异步结果Stream / 异步序列
典型场景查询单条记录、POST 返回查询列表、事件流

Mono

创建

// 有值
Mono<String> m1 = Mono.just("hello");
 
// 空
Mono<String> empty = Mono.empty();
 
// 错误
Mono<String> error = Mono.error(new RuntimeException("出错了"));
 
// 延迟执行(订阅时才调用)
Mono<String> deferred = Mono.fromSupplier(() -> fetchFromDb());
 
// 包装 Future
Mono<String> fromFuture = Mono.fromFuture(completableFuture);
 
// 包装 Callable
Mono<String> fromCallable = Mono.fromCallable(() -> blockingCall());

转换操作

Mono<String> mono = Mono.just("hello");
 
// map:同步转换值
Mono<Integer> length = mono.map(String::length);
 
// flatMap:异步转换(返回新的 Mono)
Mono<User> user = mono.flatMap(name -> userService.findByName(name));
 
// flatMapMany:Mono → Flux
Flux<Order> orders = mono.flatMapMany(name -> orderService.findByUser(name));
 
// zipWith:合并两个 Mono
Mono<String> zipped = mono.zipWith(Mono.just(" world"),
    (a, b) -> a + b);  // "hello world"
 
// defaultIfEmpty:为空时提供默认值
Mono<String> withDefault = mono.defaultIfEmpty("default");
 
// switchIfEmpty:为空时切换到另一个 Mono
Mono<User> result = findInCache().switchIfEmpty(findInDb());
 
// filter:不满足条件则变为 empty
Mono<String> filtered = mono.filter(s -> s.length() > 3);

错误处理

Mono<String> mono = Mono.error(new RuntimeException("失败"));
 
// 捕获异常,返回默认值
mono.onErrorReturn("默认值");
 
// 捕获异常,切换到另一个 Mono
mono.onErrorResume(e -> Mono.just("备用数据"));
 
// 捕获并包装异常
mono.onErrorMap(e -> new BusinessException("业务错误", e));
 
// 无论成功失败都执行(类似 finally)
mono.doFinally(signalType -> log.info("结束: {}", signalType));

副作用(do 系列,不改变流)

mono.doOnNext(val -> log.info("值: {}", val))
    .doOnError(e -> log.error("异常", e))
    .doOnSuccess(val -> log.info("成功: {}", val))
    .doOnSubscribe(sub -> log.info("开始订阅"))
    .doOnTerminate(() -> log.info("终止"));

Flux

创建

// 直接创建
Flux<Integer> f1 = Flux.just(1, 2, 3, 4, 5);
 
// 从集合
Flux<String> f2 = Flux.fromIterable(List.of("a", "b", "c"));
 
// 范围
Flux<Integer> range = Flux.range(1, 10);  // 1~10
 
// 定时发射(每秒一个)
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
 
// 空
Flux<String> empty = Flux.empty();
 
// 错误
Flux<String> error = Flux.error(new RuntimeException());
 
// 编程方式创建
Flux<Integer> flux = Flux.create(sink -> {
    for (int i = 0; i < 10; i++) {
        sink.next(i);
    }
    sink.complete();
});

转换操作

Flux<Integer> flux = Flux.range(1, 5);
 
// map:逐个同步转换
Flux<String> mapped = flux.map(i -> "item-" + i);
 
// flatMap:逐个异步转换,结果无序
Flux<User> users = flux.flatMap(id -> userService.findById(id));
 
// concatMap:逐个异步转换,保持顺序(串行)
Flux<User> ordered = flux.concatMap(id -> userService.findById(id));
 
// filter:过滤
Flux<Integer> evens = flux.filter(i -> i % 2 == 0);
 
// take / skip
flux.take(3);          // 取前 3 个
flux.skip(2);          // 跳过前 2 个
flux.takeLast(2);      // 取最后 2 个
 
// distinct / distinctUntilChanged
flux.distinct();                    // 全局去重
flux.distinctUntilChanged();        // 相邻去重
 
// sort
flux.sort();
flux.sort(Comparator.reverseOrder());

聚合操作

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
 
// 收集为 List
Mono<List<Integer>> list = flux.collectList();
 
// 收集为 Map
Mono<Map<Integer, String>> map = flux.collectMap(i -> i, i -> "v" + i);
 
// 计数
Mono<Long> count = flux.count();
 
// 归约
Mono<Integer> sum = flux.reduce(0, Integer::sum);
 
// 判断
Mono<Boolean> hasAny = flux.any(i -> i > 3);
Mono<Boolean> hasAll = flux.all(i -> i > 0);
 
// 取第一个
Mono<Integer> first = flux.next();

合并多个 Flux

Flux<String> f1 = Flux.just("a", "b");
Flux<String> f2 = Flux.just("c", "d");
 
// merge:并发订阅,按到达顺序交错(无序)
Flux<String> merged = Flux.merge(f1, f2);
 
// concat:顺序订阅,f1 完成后再订阅 f2(有序)
Flux<String> concat = Flux.concat(f1, f2);
 
// zip:逐对合并,最短优先
Flux<String> zipped = Flux.zip(f1, f2, (a, b) -> a + b);
 
// combineLatest:任一发出新值时组合最新值
Flux<String> combined = Flux.combineLatest(f1, f2, (a, b) -> a + b);

背压(Backpressure)

// 限制请求速率
flux.limitRate(10);        // 每次最多请求 10 个
 
// 背压策略(处理不过来时)
flux.onBackpressureBuffer(1000);        // 缓冲最多 1000 个
flux.onBackpressureBuffer(1000,
    dropped -> log.warn("丢弃: {}", dropped));
flux.onBackpressureDrop();              // 直接丢弃
flux.onBackpressureLatest();            // 只保留最新值

订阅

// 基本订阅
flux.subscribe(
    value -> System.out.println("值: " + value),  // onNext
    error -> log.error("异常", error),             // onError
    () -> System.out.println("完成")               // onComplete
);
 
// 阻塞等待结果(测试 / 非响应式上下文)
String result = mono.block();
List<Integer> list = flux.collectList().block();
String first = flux.blockFirst();
String last = flux.blockLast();

生产代码中避免使用 block(),会阻塞线程,破坏响应式效果。


调度器(切换线程)

// subscribeOn:影响整个链路从哪个线程开始
Mono.fromCallable(() -> blockingDbCall())
    .subscribeOn(Schedulers.boundedElastic()); // 适合阻塞 I/O
 
// publishOn:影响其后的操作在哪个线程执行
flux.publishOn(Schedulers.parallel())
    .map(this::cpuIntensiveWork);
调度器说明
Schedulers.immediate()当前线程
Schedulers.single()单一复用线程
Schedulers.parallel()CPU 核心数固定线程池,适合计算
Schedulers.boundedElastic()弹性线程池,适合阻塞 I/O(如 JDBC)
Schedulers.fromExecutor(exec)自定义线程池

WebFlux Controller

@RestController
@RequestMapping("/users")
public class UserController {
 
    private final UserRepository repo;
 
    // 返回单个
    @GetMapping("/{id}")
    public Mono<User> getUser(@PathVariable Long id) {
        return repo.findById(id)
            .switchIfEmpty(Mono.error(new NotFoundException("用户不存在")));
    }
 
    // 返回列表
    @GetMapping
    public Flux<User> listUsers() {
        return repo.findAll();
    }
 
    // 创建
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<User> create(@RequestBody Mono<User> userMono) {
        return userMono.flatMap(repo::save);
    }
 
    // SSE 流式推送
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> stream() {
        return repo.findAll().delayElements(Duration.ofSeconds(1));
    }
}

WebFlux R2DBC(响应式数据库)

// ReactiveCrudRepository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
    Flux<User> findByName(String name);
    Mono<User> findByEmail(String email);
}
 
// 使用
repo.findById(1L)
    .flatMap(user -> {
        user.setName("新名字");
        return repo.save(user);
    })
    .subscribe();

常用操作速查

// 超时
mono.timeout(Duration.ofSeconds(5));
 
// 重试
mono.retry(3);
mono.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
 
// 缓存结果(避免重复订阅)
Mono<User> cached = userMono.cache();
 
// 转换为 Optional
Mono<Optional<String>> opt = mono.map(Optional::of).defaultIfEmpty(Optional.empty());
 
// Mono ↔ Flux 互转
Mono<List<Integer>> m = flux.collectList();
Flux<Integer> f = mono.flatMapMany(Flux::fromIterable);