异步与线程池

Spring 通过 @Async 注解将方法调用提交到线程池异步执行,调用方立即返回,无需等待结果。底层基于 AOP 代理,配置正确的线程池是稳定运行的关键。


快速开始

// 1. 启动类或配置类开启异步
@SpringBootApplication
@EnableAsync
public class Application {}
 
// 2. 在方法上加 @Async
@Service
public class EmailService {
 
    @Async
    public void sendEmail(String to) {
        // 在线程池中异步执行,调用方立即返回
        mailSender.send(buildMessage(to));
    }
}

@Async 基于 AOP 代理,同类内部调用不生效(与 @Transactional 同理)。
详见 AOP


线程池配置

Spring Boot 默认使用 SimpleAsyncTaskExecutor(每次任务新建线程,无复用),生产环境必须自定义。

方式一:配置文件(快速,适合统一配置)

spring:
  task:
    execution:
      pool:
        core-size: 8          # 核心线程数
        max-size: 32          # 最大线程数
        queue-capacity: 500   # 队列容量(满后才扩容到 max-size)
        keep-alive: 60s       # 空闲线程存活时间
      thread-name-prefix: "async-task-"
      shutdown:
        await-termination: true         # 关闭时等待任务完成
        await-termination-period: 30s   # 最长等待 30 秒

方式二:代码配置(推荐,支持多线程池)

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
 
    // 默认线程池(无 @Async 指定 executor 时使用)
    @Override
    @Bean("taskExecutor")
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(32);
        executor.setQueueCapacity(500);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("async-");
        // 拒绝策略:调用者线程直接执行(防止任务丢失)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待任务完成后再关闭(优雅停机)
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(30);
        executor.initialize();
        return executor;
    }
 
    // 全局异步异常处理
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) ->
            log.error("异步方法异常: method={}, params={}", method.getName(), params, ex);
    }
}

多线程池隔离

不同业务使用独立线程池,防止相互干扰(舱壁模式):

@Configuration
public class ThreadPoolConfig {
 
    // 邮件发送线程池
    @Bean("mailExecutor")
    public Executor mailExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("mail-");
        executor.initialize();
        return executor;
    }
 
    // 报表生成线程池
    @Bean("reportExecutor")
    public Executor reportExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("report-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        return executor;
    }
}
 
// 指定使用哪个线程池
@Service
public class ReportService {
 
    @Async("reportExecutor")
    public CompletableFuture<Report> generate(Long id) {
        Report report = buildHeavyReport(id);
        return CompletableFuture.completedFuture(report);
    }
}

返回值处理

无返回值(fire-and-forget)

@Async
public void sendNotification(Long userId) {
    // 不返回值,调用方不等待
    notificationClient.push(userId);
}

CompletableFuture(获取结果)

@Async
public CompletableFuture<User> fetchUser(Long id) {
    User user = userRepository.findById(id).orElseThrow();
    return CompletableFuture.completedFuture(user);
}
 
// 调用方
@Service
public class OrderService {
 
    public OrderDetail buildDetail(Long orderId) throws Exception {
        // 并行发起两个异步查询
        CompletableFuture<User> userFuture = userService.fetchUser(userId);
        CompletableFuture<Inventory> invFuture = inventoryService.fetch(productId);
 
        // 等待全部完成(阻塞当前线程)
        CompletableFuture.allOf(userFuture, invFuture).join();
 
        return OrderDetail.of(userFuture.get(), invFuture.get());
    }
}

CompletableFuture 常用操作

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "hello", executor);
 
// 转换结果(同步)
CompletableFuture<Integer> length = cf.thenApply(String::length);
 
// 异步转换(返回新的 Future)
CompletableFuture<User> user = cf.thenCompose(name -> findUserAsync(name));
 
// 结果消费
cf.thenAccept(result -> log.info("结果: {}", result));
 
// 两个 Future 都完成后合并
CompletableFuture<String> combined = cf.thenCombine(otherFuture,
    (r1, r2) -> r1 + " " + r2);
 
// 全部完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
 
// 任一完成(取最快的)
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
 
// 异常处理
cf.exceptionally(ex -> {
    log.error("异步异常", ex);
    return "默认值";
});
 
// 无论成功失败都执行(类似 finally)
cf.whenComplete((result, ex) -> {
    if (ex != null) log.error("失败", ex);
    else log.info("成功: {}", result);
});

异步异常处理

无返回值方法的异常

@Async 无返回值方法抛出异常时,调用方感知不到,需配置全局异常处理器:

@Configuration
public class AsyncConfig implements AsyncConfigurer {
 
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }
}
 
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
 
    @Override
    public void handleUncaughtException(Throwable ex,
                                        Method method,
                                        Object... params) {
        log.error("异步异常 | 方法: {}.{} | 参数: {} | 异常: {}",
            method.getDeclaringClass().getSimpleName(),
            method.getName(),
            Arrays.toString(params),
            ex.getMessage(), ex);
 
        // 告警通知
        alertService.sendAlert("异步任务失败: " + method.getName());
    }
}

有返回值方法的异常

@Async
public CompletableFuture<Report> generateReport(Long id) {
    try {
        return CompletableFuture.completedFuture(buildReport(id));
    } catch (Exception e) {
        // 方式一:显式包装异常
        return CompletableFuture.failedFuture(e);
    }
}
 
// 调用方捕获
reportService.generateReport(id)
    .exceptionally(ex -> {
        log.error("报表生成失败", ex);
        return Report.empty();
    })
    .thenAccept(report -> saveToStorage(report));

传递上下文

异步方法运行在新线程中,SecurityContextRequestAttributes 等上下文需手动传递:

传递 Spring Security 上下文

@Configuration
public class AsyncSecurityConfig {
 
    @Bean
    public Executor securityAwareExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.initialize();
        // 包装:自动将当前 SecurityContext 传递给子线程
        return new DelegatingSecurityContextAsyncTaskExecutor(executor);
    }
}

传递 MDC 日志上下文(TraceId)

@Bean
public Executor mdcAwareExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(8);
    executor.setTaskDecorator(runnable -> {
        // 在提交任务时捕获 MDC(调用方线程)
        Map<String, String> mdcContext = MDC.getCopyOfContextMap();
        return () -> {
            try {
                // 在执行任务时恢复 MDC(子线程)
                if (mdcContext != null) MDC.setContextMap(mdcContext);
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    });
    executor.initialize();
    return executor;
}

链路追踪与 MDC 参见 链路追踪


线程池监控

@Component
@RequiredArgsConstructor
@Slf4j
public class ThreadPoolMonitor {
 
    @Qualifier("taskExecutor")
    private final ThreadPoolTaskExecutor executor;
 
    // 定期打印线程池状态(配合 [[定时任务]])
    @Scheduled(fixedDelay = 30_000)
    public void logStats() {
        ThreadPoolExecutor pool = executor.getThreadPoolExecutor();
        log.info("线程池 | 活跃: {}/{} | 队列: {}/{} | 完成: {} | 拒绝累计: {}",
            pool.getActiveCount(), pool.getMaximumPoolSize(),
            pool.getQueue().size(), executor.getQueueCapacity(),
            pool.getCompletedTaskCount(),
            pool.getTaskCount() - pool.getCompletedTaskCount() - pool.getQueue().size());
    }
}

Actuator 暴露线程池指标(配合 Prometheus / Grafana):

management:
  endpoints:
    web:
      exposure:
        include: metrics, health

监控接入详见 指标采集


拒绝策略

策略行为适用场景
AbortPolicy(默认)抛出 RejectedExecutionException快速失败,调用方感知
CallerRunsPolicy由调用方线程直接执行不丢任务,自动降速
DiscardPolicy静默丢弃允许丢失(日志、埋点)
DiscardOldestPolicy丢弃队列最旧的任务实时性优先
自定义持久化到 DB/MQ 重试不允许丢失的关键任务
// 自定义拒绝策略:写入数据库等待重试
executor.setRejectedExecutionHandler((runnable, pool) -> {
    if (runnable instanceof RetryableTask task) {
        failedTaskRepository.save(task.toEntity());
        log.warn("任务被拒绝,已写入重试队列: {}", task.getId());
    } else {
        throw new RejectedExecutionException("线程池已满,任务被拒绝");
    }
});

虚拟线程替代方案

Java 21 + Spring Boot 3.2 可用虚拟线程替代传统线程池:

spring:
  threads:
    virtual:
      enabled: true  # @Async 自动使用虚拟线程,无需手动配置线程池

I/O 密集型任务(数据库查询、HTTP 调用)适合虚拟线程,无需再精心调整线程池参数。
虚拟线程详见 虚拟线程


相关链接