响应式编程
响应式编程基于异步数据流,通过非阻塞的方式处理数据,适合高并发 I/O 密集型场景。Spring 生态的响应式实现基于 Project Reactor,核心类型是 Mono 和 Flux。
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>Mono 与 Flux
Mono<T> | Flux<T> | |
|---|---|---|
| 元素数量 | 0 或 1 个 | 0 到 N 个 |
| 类比 | Optional / 单个异步结果 | Stream / 异步序列 |
| 典型场景 | 查询单条记录、POST 返回 | 查询列表、事件流 |
Mono
创建
// 有值
Mono<String> m1 = Mono.just("hello");
// 空
Mono<String> empty = Mono.empty();
// 错误
Mono<String> error = Mono.error(new RuntimeException("出错了"));
// 延迟执行(订阅时才调用)
Mono<String> deferred = Mono.fromSupplier(() -> fetchFromDb());
// 包装 Future
Mono<String> fromFuture = Mono.fromFuture(completableFuture);
// 包装 Callable
Mono<String> fromCallable = Mono.fromCallable(() -> blockingCall());转换操作
Mono<String> mono = Mono.just("hello");
// map:同步转换值
Mono<Integer> length = mono.map(String::length);
// flatMap:异步转换(返回新的 Mono)
Mono<User> user = mono.flatMap(name -> userService.findByName(name));
// flatMapMany:Mono → Flux
Flux<Order> orders = mono.flatMapMany(name -> orderService.findByUser(name));
// zipWith:合并两个 Mono
Mono<String> zipped = mono.zipWith(Mono.just(" world"),
(a, b) -> a + b); // "hello world"
// defaultIfEmpty:为空时提供默认值
Mono<String> withDefault = mono.defaultIfEmpty("default");
// switchIfEmpty:为空时切换到另一个 Mono
Mono<User> result = findInCache().switchIfEmpty(findInDb());
// filter:不满足条件则变为 empty
Mono<String> filtered = mono.filter(s -> s.length() > 3);错误处理
Mono<String> mono = Mono.error(new RuntimeException("失败"));
// 捕获异常,返回默认值
mono.onErrorReturn("默认值");
// 捕获异常,切换到另一个 Mono
mono.onErrorResume(e -> Mono.just("备用数据"));
// 捕获并包装异常
mono.onErrorMap(e -> new BusinessException("业务错误", e));
// 无论成功失败都执行(类似 finally)
mono.doFinally(signalType -> log.info("结束: {}", signalType));副作用(do 系列,不改变流)
mono.doOnNext(val -> log.info("值: {}", val))
.doOnError(e -> log.error("异常", e))
.doOnSuccess(val -> log.info("成功: {}", val))
.doOnSubscribe(sub -> log.info("开始订阅"))
.doOnTerminate(() -> log.info("终止"));Flux
创建
// 直接创建
Flux<Integer> f1 = Flux.just(1, 2, 3, 4, 5);
// 从集合
Flux<String> f2 = Flux.fromIterable(List.of("a", "b", "c"));
// 范围
Flux<Integer> range = Flux.range(1, 10); // 1~10
// 定时发射(每秒一个)
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
// 空
Flux<String> empty = Flux.empty();
// 错误
Flux<String> error = Flux.error(new RuntimeException());
// 编程方式创建
Flux<Integer> flux = Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
});转换操作
Flux<Integer> flux = Flux.range(1, 5);
// map:逐个同步转换
Flux<String> mapped = flux.map(i -> "item-" + i);
// flatMap:逐个异步转换,结果无序
Flux<User> users = flux.flatMap(id -> userService.findById(id));
// concatMap:逐个异步转换,保持顺序(串行)
Flux<User> ordered = flux.concatMap(id -> userService.findById(id));
// filter:过滤
Flux<Integer> evens = flux.filter(i -> i % 2 == 0);
// take / skip
flux.take(3); // 取前 3 个
flux.skip(2); // 跳过前 2 个
flux.takeLast(2); // 取最后 2 个
// distinct / distinctUntilChanged
flux.distinct(); // 全局去重
flux.distinctUntilChanged(); // 相邻去重
// sort
flux.sort();
flux.sort(Comparator.reverseOrder());聚合操作
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
// 收集为 List
Mono<List<Integer>> list = flux.collectList();
// 收集为 Map
Mono<Map<Integer, String>> map = flux.collectMap(i -> i, i -> "v" + i);
// 计数
Mono<Long> count = flux.count();
// 归约
Mono<Integer> sum = flux.reduce(0, Integer::sum);
// 判断
Mono<Boolean> hasAny = flux.any(i -> i > 3);
Mono<Boolean> hasAll = flux.all(i -> i > 0);
// 取第一个
Mono<Integer> first = flux.next();合并多个 Flux
Flux<String> f1 = Flux.just("a", "b");
Flux<String> f2 = Flux.just("c", "d");
// merge:并发订阅,按到达顺序交错(无序)
Flux<String> merged = Flux.merge(f1, f2);
// concat:顺序订阅,f1 完成后再订阅 f2(有序)
Flux<String> concat = Flux.concat(f1, f2);
// zip:逐对合并,最短优先
Flux<String> zipped = Flux.zip(f1, f2, (a, b) -> a + b);
// combineLatest:任一发出新值时组合最新值
Flux<String> combined = Flux.combineLatest(f1, f2, (a, b) -> a + b);背压(Backpressure)
// 限制请求速率
flux.limitRate(10); // 每次最多请求 10 个
// 背压策略(处理不过来时)
flux.onBackpressureBuffer(1000); // 缓冲最多 1000 个
flux.onBackpressureBuffer(1000,
dropped -> log.warn("丢弃: {}", dropped));
flux.onBackpressureDrop(); // 直接丢弃
flux.onBackpressureLatest(); // 只保留最新值订阅
// 基本订阅
flux.subscribe(
value -> System.out.println("值: " + value), // onNext
error -> log.error("异常", error), // onError
() -> System.out.println("完成") // onComplete
);
// 阻塞等待结果(测试 / 非响应式上下文)
String result = mono.block();
List<Integer> list = flux.collectList().block();
String first = flux.blockFirst();
String last = flux.blockLast();生产代码中避免使用 block(),会阻塞线程,破坏响应式效果。
调度器(切换线程)
// subscribeOn:影响整个链路从哪个线程开始
Mono.fromCallable(() -> blockingDbCall())
.subscribeOn(Schedulers.boundedElastic()); // 适合阻塞 I/O
// publishOn:影响其后的操作在哪个线程执行
flux.publishOn(Schedulers.parallel())
.map(this::cpuIntensiveWork);| 调度器 | 说明 |
|---|---|
Schedulers.immediate() | 当前线程 |
Schedulers.single() | 单一复用线程 |
Schedulers.parallel() | CPU 核心数固定线程池,适合计算 |
Schedulers.boundedElastic() | 弹性线程池,适合阻塞 I/O(如 JDBC) |
Schedulers.fromExecutor(exec) | 自定义线程池 |
WebFlux Controller
@RestController
@RequestMapping("/users")
public class UserController {
private final UserRepository repo;
// 返回单个
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return repo.findById(id)
.switchIfEmpty(Mono.error(new NotFoundException("用户不存在")));
}
// 返回列表
@GetMapping
public Flux<User> listUsers() {
return repo.findAll();
}
// 创建
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> create(@RequestBody Mono<User> userMono) {
return userMono.flatMap(repo::save);
}
// SSE 流式推送
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> stream() {
return repo.findAll().delayElements(Duration.ofSeconds(1));
}
}WebFlux R2DBC(响应式数据库)
// ReactiveCrudRepository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByName(String name);
Mono<User> findByEmail(String email);
}
// 使用
repo.findById(1L)
.flatMap(user -> {
user.setName("新名字");
return repo.save(user);
})
.subscribe();常用操作速查
// 超时
mono.timeout(Duration.ofSeconds(5));
// 重试
mono.retry(3);
mono.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
// 缓存结果(避免重复订阅)
Mono<User> cached = userMono.cache();
// 转换为 Optional
Mono<Optional<String>> opt = mono.map(Optional::of).defaultIfEmpty(Optional.empty());
// Mono ↔ Flux 互转
Mono<List<Integer>> m = flux.collectList();
Flux<Integer> f = mono.flatMapMany(Flux::fromIterable);