线程池

为什么用线程池

  • 降低线程创建/销毁开销
  • 控制并发数量,防止资源耗尽
  • 提供任务队列、拒绝策略、线程复用等管理能力

ThreadPoolExecutor

所有线程池的核心实现类:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    4,                                  // corePoolSize:核心线程数
    8,                                  // maximumPoolSize:最大线程数
    60L, TimeUnit.SECONDS,             // keepAliveTime:非核心线程空闲存活时间
    new LinkedBlockingQueue<>(1000),    // workQueue:任务队列
    new ThreadFactoryBuilder()          // threadFactory:线程工厂
        .setNameFormat("biz-pool-%d")
        .build(),
    new ThreadPoolExecutor.CallerRunsPolicy() // handler:拒绝策略
);

执行流程

提交任务
  │
  ├─ 核心线程数未满 → 创建新核心线程执行
  │
  ├─ 核心线程已满,队列未满 → 放入队列等待
  │
  ├─ 队列已满,未达最大线程数 → 创建非核心线程执行
  │
  └─ 达到最大线程数且队列已满 → 执行拒绝策略

拒绝策略

策略行为
AbortPolicy(默认)抛出 RejectedExecutionException
CallerRunsPolicy由提交任务的线程自己执行,起到降速作用
DiscardPolicy静默丢弃任务
DiscardOldestPolicy丢弃队列中最老的任务,重新提交

任务队列

队列特点
LinkedBlockingQueue无界(默认 Integer.MAX_VALUE),可能堆积大量任务
ArrayBlockingQueue有界,推荐使用
SynchronousQueue不存储任务,直接交给线程(CachedThreadPool 用)
PriorityBlockingQueue按优先级出队
DelayQueue延迟任务

Executors 工厂方法

生产环境不推荐,存在资源耗尽风险,仅了解:

// 固定线程数,无界队列(队列堆积风险)
ExecutorService fixed = Executors.newFixedThreadPool(4);
 
// 动态扩容,SynchronousQueue,线程数无上限(OOM 风险)
ExecutorService cached = Executors.newCachedThreadPool();
 
// 单线程,保证顺序执行,无界队列
ExecutorService single = Executors.newSingleThreadExecutor();
 
// 支持定时和周期任务
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);

提交任务

// execute:无返回值,异常会传递给 UncaughtExceptionHandler
executor.execute(() -> doWork());
 
// submit:返回 Future,异常被封装在 Future 中,get() 时抛出
Future<String> future = executor.submit(() -> {
    return fetchData();
});
 
try {
    String result = future.get(5, TimeUnit.SECONDS); // 超时等待
} catch (TimeoutException e) {
    future.cancel(true);
} catch (ExecutionException e) {
    Throwable cause = e.getCause(); // 任务中抛出的异常
}
 
// invokeAll:等待所有任务完成
List<Future<String>> futures = executor.invokeAll(tasks, 10, TimeUnit.SECONDS);
 
// invokeAny:任一完成即返回
String first = executor.invokeAny(tasks);

CompletableFuture(Java 8+)

更强大的异步编排工具:

// 异步执行
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    return fetchUser(id);
}, executor);
 
// 链式处理
cf.thenApply(user -> user.getName())           // 转换结果
  .thenAccept(name -> System.out.println(name)) // 消费结果
  .exceptionally(e -> { log.error(e); return null; }); // 异常处理
 
// 并行执行,等待全部完成
CompletableFuture.allOf(cf1, cf2, cf3)
    .thenRun(() -> System.out.println("全部完成"));
 
// 并行执行,任一完成
CompletableFuture.anyOf(cf1, cf2)
    .thenAccept(result -> System.out.println("最快: " + result));
 
// 组合两个任务
cf1.thenCombine(cf2, (r1, r2) -> r1 + r2);
 
// 异步转换(避免阻塞线程池)
cf.thenComposeAsync(user -> fetchOrders(user.getId()), executor);

线程池关闭

executor.shutdown();       // 不接受新任务,等待已有任务完成
executor.shutdownNow();    // 中断所有线程,返回未执行的任务列表
 
// 等待终止
boolean done = executor.awaitTermination(30, TimeUnit.SECONDS);
 
// 优雅关闭模板
executor.shutdown();
try {
    if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
        executor.shutdownNow();
        if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
            log.error("线程池未能正常关闭");
        }
    }
} catch (InterruptedException e) {
    executor.shutdownNow();
    Thread.currentThread().interrupt();
}

监控

ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
tpe.getPoolSize();          // 当前线程数
tpe.getActiveCount();       // 活跃线程数
tpe.getCompletedTaskCount();// 已完成任务数
tpe.getQueue().size();      // 队列中等待任务数
tpe.getTaskCount();         // 总任务数(完成 + 队列 + 执行中)

线程池大小设置

CPU 密集型(计算为主,无阻塞):

线程数 = CPU 核心数 + 1

IO 密集型(数据库、网络、文件):

线程数 = CPU 核心数 × (1 + 平均等待时间 / 平均计算时间)

实际建议:通过压测找到最优值,设置有界队列,配合监控告警。

Spring 异步线程池

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
 
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("async-");
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
 
    @Override
    public Executor getAsyncExecutor() {
        return taskExecutor();
    }
}
 
// 使用
@Async("taskExecutor")
public CompletableFuture<String> asyncMethod() {
    return CompletableFuture.completedFuture(doWork());
}

相关链接