线程池
为什么用线程池
- 降低线程创建/销毁开销
- 控制并发数量,防止资源耗尽
- 提供任务队列、拒绝策略、线程复用等管理能力
ThreadPoolExecutor
所有线程池的核心实现类:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, // corePoolSize:核心线程数
8, // maximumPoolSize:最大线程数
60L, TimeUnit.SECONDS, // keepAliveTime:非核心线程空闲存活时间
new LinkedBlockingQueue<>(1000), // workQueue:任务队列
new ThreadFactoryBuilder() // threadFactory:线程工厂
.setNameFormat("biz-pool-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // handler:拒绝策略
);执行流程
提交任务
│
├─ 核心线程数未满 → 创建新核心线程执行
│
├─ 核心线程已满,队列未满 → 放入队列等待
│
├─ 队列已满,未达最大线程数 → 创建非核心线程执行
│
└─ 达到最大线程数且队列已满 → 执行拒绝策略
拒绝策略
| 策略 | 行为 |
|---|---|
AbortPolicy(默认) | 抛出 RejectedExecutionException |
CallerRunsPolicy | 由提交任务的线程自己执行,起到降速作用 |
DiscardPolicy | 静默丢弃任务 |
DiscardOldestPolicy | 丢弃队列中最老的任务,重新提交 |
任务队列
| 队列 | 特点 |
|---|---|
LinkedBlockingQueue | 无界(默认 Integer.MAX_VALUE),可能堆积大量任务 |
ArrayBlockingQueue | 有界,推荐使用 |
SynchronousQueue | 不存储任务,直接交给线程(CachedThreadPool 用) |
PriorityBlockingQueue | 按优先级出队 |
DelayQueue | 延迟任务 |
Executors 工厂方法
生产环境不推荐,存在资源耗尽风险,仅了解:
// 固定线程数,无界队列(队列堆积风险)
ExecutorService fixed = Executors.newFixedThreadPool(4);
// 动态扩容,SynchronousQueue,线程数无上限(OOM 风险)
ExecutorService cached = Executors.newCachedThreadPool();
// 单线程,保证顺序执行,无界队列
ExecutorService single = Executors.newSingleThreadExecutor();
// 支持定时和周期任务
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);提交任务
// execute:无返回值,异常会传递给 UncaughtExceptionHandler
executor.execute(() -> doWork());
// submit:返回 Future,异常被封装在 Future 中,get() 时抛出
Future<String> future = executor.submit(() -> {
return fetchData();
});
try {
String result = future.get(5, TimeUnit.SECONDS); // 超时等待
} catch (TimeoutException e) {
future.cancel(true);
} catch (ExecutionException e) {
Throwable cause = e.getCause(); // 任务中抛出的异常
}
// invokeAll:等待所有任务完成
List<Future<String>> futures = executor.invokeAll(tasks, 10, TimeUnit.SECONDS);
// invokeAny:任一完成即返回
String first = executor.invokeAny(tasks);CompletableFuture(Java 8+)
更强大的异步编排工具:
// 异步执行
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
return fetchUser(id);
}, executor);
// 链式处理
cf.thenApply(user -> user.getName()) // 转换结果
.thenAccept(name -> System.out.println(name)) // 消费结果
.exceptionally(e -> { log.error(e); return null; }); // 异常处理
// 并行执行,等待全部完成
CompletableFuture.allOf(cf1, cf2, cf3)
.thenRun(() -> System.out.println("全部完成"));
// 并行执行,任一完成
CompletableFuture.anyOf(cf1, cf2)
.thenAccept(result -> System.out.println("最快: " + result));
// 组合两个任务
cf1.thenCombine(cf2, (r1, r2) -> r1 + r2);
// 异步转换(避免阻塞线程池)
cf.thenComposeAsync(user -> fetchOrders(user.getId()), executor);线程池关闭
executor.shutdown(); // 不接受新任务,等待已有任务完成
executor.shutdownNow(); // 中断所有线程,返回未执行的任务列表
// 等待终止
boolean done = executor.awaitTermination(30, TimeUnit.SECONDS);
// 优雅关闭模板
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
log.error("线程池未能正常关闭");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}监控
ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
tpe.getPoolSize(); // 当前线程数
tpe.getActiveCount(); // 活跃线程数
tpe.getCompletedTaskCount();// 已完成任务数
tpe.getQueue().size(); // 队列中等待任务数
tpe.getTaskCount(); // 总任务数(完成 + 队列 + 执行中)线程池大小设置
CPU 密集型(计算为主,无阻塞):
线程数 = CPU 核心数 + 1
IO 密集型(数据库、网络、文件):
线程数 = CPU 核心数 × (1 + 平均等待时间 / 平均计算时间)
实际建议:通过压测找到最优值,设置有界队列,配合监控告警。
Spring 异步线程池
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return taskExecutor();
}
}
// 使用
@Async("taskExecutor")
public CompletableFuture<String> asyncMethod() {
return CompletableFuture.completedFuture(doWork());
}