异步与线程池
Spring 通过 @Async 注解将方法调用提交到线程池异步执行,调用方立即返回,无需等待结果。底层基于 AOP 代理,配置正确的线程池是稳定运行的关键。
快速开始
// 1. 启动类或配置类开启异步
@SpringBootApplication
@EnableAsync
public class Application {}
// 2. 在方法上加 @Async
@Service
public class EmailService {
@Async
public void sendEmail(String to) {
// 在线程池中异步执行,调用方立即返回
mailSender.send(buildMessage(to));
}
}
@Async基于 AOP 代理,同类内部调用不生效(与@Transactional同理)。
详见 AOP。
线程池配置
Spring Boot 默认使用 SimpleAsyncTaskExecutor(每次任务新建线程,无复用),生产环境必须自定义。
方式一:配置文件(快速,适合统一配置)
spring:
task:
execution:
pool:
core-size: 8 # 核心线程数
max-size: 32 # 最大线程数
queue-capacity: 500 # 队列容量(满后才扩容到 max-size)
keep-alive: 60s # 空闲线程存活时间
thread-name-prefix: "async-task-"
shutdown:
await-termination: true # 关闭时等待任务完成
await-termination-period: 30s # 最长等待 30 秒方式二:代码配置(推荐,支持多线程池)
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
// 默认线程池(无 @Async 指定 executor 时使用)
@Override
@Bean("taskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(32);
executor.setQueueCapacity(500);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("async-");
// 拒绝策略:调用者线程直接执行(防止任务丢失)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待任务完成后再关闭(优雅停机)
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
// 全局异步异常处理
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) ->
log.error("异步方法异常: method={}, params={}", method.getName(), params, ex);
}
}多线程池隔离
不同业务使用独立线程池,防止相互干扰(舱壁模式):
@Configuration
public class ThreadPoolConfig {
// 邮件发送线程池
@Bean("mailExecutor")
public Executor mailExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("mail-");
executor.initialize();
return executor;
}
// 报表生成线程池
@Bean("reportExecutor")
public Executor reportExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("report-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
}
// 指定使用哪个线程池
@Service
public class ReportService {
@Async("reportExecutor")
public CompletableFuture<Report> generate(Long id) {
Report report = buildHeavyReport(id);
return CompletableFuture.completedFuture(report);
}
}返回值处理
无返回值(fire-and-forget)
@Async
public void sendNotification(Long userId) {
// 不返回值,调用方不等待
notificationClient.push(userId);
}CompletableFuture(获取结果)
@Async
public CompletableFuture<User> fetchUser(Long id) {
User user = userRepository.findById(id).orElseThrow();
return CompletableFuture.completedFuture(user);
}
// 调用方
@Service
public class OrderService {
public OrderDetail buildDetail(Long orderId) throws Exception {
// 并行发起两个异步查询
CompletableFuture<User> userFuture = userService.fetchUser(userId);
CompletableFuture<Inventory> invFuture = inventoryService.fetch(productId);
// 等待全部完成(阻塞当前线程)
CompletableFuture.allOf(userFuture, invFuture).join();
return OrderDetail.of(userFuture.get(), invFuture.get());
}
}CompletableFuture 常用操作
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "hello", executor);
// 转换结果(同步)
CompletableFuture<Integer> length = cf.thenApply(String::length);
// 异步转换(返回新的 Future)
CompletableFuture<User> user = cf.thenCompose(name -> findUserAsync(name));
// 结果消费
cf.thenAccept(result -> log.info("结果: {}", result));
// 两个 Future 都完成后合并
CompletableFuture<String> combined = cf.thenCombine(otherFuture,
(r1, r2) -> r1 + " " + r2);
// 全部完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
// 任一完成(取最快的)
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
// 异常处理
cf.exceptionally(ex -> {
log.error("异步异常", ex);
return "默认值";
});
// 无论成功失败都执行(类似 finally)
cf.whenComplete((result, ex) -> {
if (ex != null) log.error("失败", ex);
else log.info("成功: {}", result);
});异步异常处理
无返回值方法的异常
@Async 无返回值方法抛出异常时,调用方感知不到,需配置全局异常处理器:
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex,
Method method,
Object... params) {
log.error("异步异常 | 方法: {}.{} | 参数: {} | 异常: {}",
method.getDeclaringClass().getSimpleName(),
method.getName(),
Arrays.toString(params),
ex.getMessage(), ex);
// 告警通知
alertService.sendAlert("异步任务失败: " + method.getName());
}
}有返回值方法的异常
@Async
public CompletableFuture<Report> generateReport(Long id) {
try {
return CompletableFuture.completedFuture(buildReport(id));
} catch (Exception e) {
// 方式一:显式包装异常
return CompletableFuture.failedFuture(e);
}
}
// 调用方捕获
reportService.generateReport(id)
.exceptionally(ex -> {
log.error("报表生成失败", ex);
return Report.empty();
})
.thenAccept(report -> saveToStorage(report));传递上下文
异步方法运行在新线程中,SecurityContext、RequestAttributes 等上下文需手动传递:
传递 Spring Security 上下文
@Configuration
public class AsyncSecurityConfig {
@Bean
public Executor securityAwareExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.initialize();
// 包装:自动将当前 SecurityContext 传递给子线程
return new DelegatingSecurityContextAsyncTaskExecutor(executor);
}
}传递 MDC 日志上下文(TraceId)
@Bean
public Executor mdcAwareExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setTaskDecorator(runnable -> {
// 在提交任务时捕获 MDC(调用方线程)
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
return () -> {
try {
// 在执行任务时恢复 MDC(子线程)
if (mdcContext != null) MDC.setContextMap(mdcContext);
runnable.run();
} finally {
MDC.clear();
}
};
});
executor.initialize();
return executor;
}链路追踪与 MDC 参见 链路追踪。
线程池监控
@Component
@RequiredArgsConstructor
@Slf4j
public class ThreadPoolMonitor {
@Qualifier("taskExecutor")
private final ThreadPoolTaskExecutor executor;
// 定期打印线程池状态(配合 [[定时任务]])
@Scheduled(fixedDelay = 30_000)
public void logStats() {
ThreadPoolExecutor pool = executor.getThreadPoolExecutor();
log.info("线程池 | 活跃: {}/{} | 队列: {}/{} | 完成: {} | 拒绝累计: {}",
pool.getActiveCount(), pool.getMaximumPoolSize(),
pool.getQueue().size(), executor.getQueueCapacity(),
pool.getCompletedTaskCount(),
pool.getTaskCount() - pool.getCompletedTaskCount() - pool.getQueue().size());
}
}Actuator 暴露线程池指标(配合 Prometheus / Grafana):
management:
endpoints:
web:
exposure:
include: metrics, health监控接入详见 指标采集。
拒绝策略
| 策略 | 行为 | 适用场景 |
|---|---|---|
AbortPolicy(默认) | 抛出 RejectedExecutionException | 快速失败,调用方感知 |
CallerRunsPolicy | 由调用方线程直接执行 | 不丢任务,自动降速 |
DiscardPolicy | 静默丢弃 | 允许丢失(日志、埋点) |
DiscardOldestPolicy | 丢弃队列最旧的任务 | 实时性优先 |
| 自定义 | 持久化到 DB/MQ 重试 | 不允许丢失的关键任务 |
// 自定义拒绝策略:写入数据库等待重试
executor.setRejectedExecutionHandler((runnable, pool) -> {
if (runnable instanceof RetryableTask task) {
failedTaskRepository.save(task.toEntity());
log.warn("任务被拒绝,已写入重试队列: {}", task.getId());
} else {
throw new RejectedExecutionException("线程池已满,任务被拒绝");
}
});虚拟线程替代方案
Java 21 + Spring Boot 3.2 可用虚拟线程替代传统线程池:
spring:
threads:
virtual:
enabled: true # @Async 自动使用虚拟线程,无需手动配置线程池I/O 密集型任务(数据库查询、HTTP 调用)适合虚拟线程,无需再精心调整线程池参数。
虚拟线程详见 虚拟线程。