# Java多线程与线程池
目录
- 线程池基础再深入
- ThreadPoolExecutor 源码级剖析
- 阻塞队列深度对比与选型
- 拒绝策略详解与自定义实战
- 线程工厂定制与线程命名规范
- 线程池动态调整与运行时管理
- 任务提交与结果处理进阶
- 线程池钩子方法与扩展点
- ScheduledThreadPoolExecutor 深度解析
- 线程池监控与告警体系搭建
- 常见陷阱与最佳实践
- 生产级线程池配置模板
一、线程池基础再深入
1.1 线程池的设计思想 —— 池化技术
池化技术的核心是空间换时间,预先创建资源对象,使用时直接获取,用完后归还,避免频繁创建销毁的开销。除了线程池,常见的池化技术还有:
- 数据库连接池(HikariCP、Druid)
- 对象池(Apache Commons Pool)
- 内存池(Netty 的 ByteBuf)
1.2 线程池解决了什么问题?(细化版)
| 问题 | 线程池解决方案 |
|---|---|
| 线程创建开销大 | 线程对象复用,避免反复 new Thread().start() 的系统调用 |
| 线程数量失控 | 通过 maximumPoolSize 和队列容量双重限制并发上限 |
| 任务堆积导致 OOM | 使用有界队列 + 拒绝策略,防止无限制接受任务 |
| 线程生命周期管理复杂 | 统一提供 shutdown() / shutdownNow() 接口 |
| 任务执行情况难以追踪 | 提供 beforeExecute / afterExecute 钩子和统计指标 |
1.3 线程池的替代方案比较
| 方案 | 优点 | 缺点 |
|---|---|---|
new Thread(runnable).start() | 简单直接 | 无复用,资源浪费,难以管理 |
Timer | 支持定时 | 单线程,异常即停,不推荐 |
| ThreadPoolExecutor | 生产级标准 | 参数较多,需理解原理 |
| ForkJoinPool | 工作窃取,适合递归分治任务 | 不适合普通任务 |
| CompletableFuture + 自定义线程池 | 异步编程更优雅 | 学习曲线稍高 |
二、ThreadPoolExecutor 源码级剖析
2.1 核心数据结构 —— ctl 原子变量
ThreadPoolExecutor 使用一个 AtomicInteger 类型的 ctl 变量,同时存储两个信息:
- 高 3 位:线程池运行状态(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED)
- 低 29 位:当前工作线程数量(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 约 5.3 亿
// 状态常量(高3位)
private static final int RUNNING = -1 << COUNT_BITS; // 111...
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000...
private static final int STOP = 1 << COUNT_BITS; // 001...
private static final int TIDYING = 2 << COUNT_BITS; // 010...
private static final int TERMINATED = 3 << COUNT_BITS; // 011...
// 拆包方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }状态转换图:
RUNNING
│ shutdown()
▼
SHUTDOWN ──队列为空──▶ TIDYING ── terminated()──▶ TERMINATED
│
│ shutdownNow()
▼
STOP ──────────────▶ TIDYING ── terminated()──▶ TERMINATED2.2 内部类 Worker —— 工作线程的封装
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // 实际执行任务的线程
Runnable firstTask; // 构造时可能携带的首个任务
volatile long completedTasks; // 完成任务计数
Worker(Runnable firstTask) {
setState(-1); // 初始禁止中断,直到线程启动
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this); // 核心循环
}
}Worker 继承 AQS 实现了简单的非重入互斥锁,目的是:
- 在执行任务期间,防止其他线程中断正在运行任务的 Worker(
interruptIdleWorkers只中断空闲的) - 锁状态表示 Worker 是否正在执行任务(
state >= 1表示忙碌)
2.3 runWorker —— 线程池运行核心循环
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 1. 检查线程池状态,必要时中断自己
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子:任务执行前
Throwable thrown = null;
try {
task.run(); // 执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 钩子:任务执行后
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}2.4 getTask —— 从队列获取任务(包含线程销毁逻辑)
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查是否需要减少 Worker
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 是否允许核心线程超时 或 当前线程数超过核心数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据 timed 决定是 poll(带超时)还是 take(阻塞)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}关键点:
- 非核心线程通过
poll超时返回null后会被回收 - 核心线程默认
take无限阻塞,除非设置allowCoreThreadTimeOut(true)
三、阻塞队列深度对比与选型
3.1 常用阻塞队列底层结构与特性
| 队列 | 底层结构 | 锁机制 | 容量 | 适用场景 |
|---|---|---|---|---|
ArrayBlockingQueue | 数组 | 单锁(ReentrantLock) | 必须指定 | 固定容量、内存占用可控 |
LinkedBlockingQueue | 单向链表 | 两把锁(putLock/takeLock) | 默认 Integer.MAX_VALUE | 吞吐量高、无需预知容量 |
SynchronousQueue | 无存储 | 公平模式队列/非公平栈 | 0 | 直接交付,CachedThreadPool |
PriorityBlockingQueue | 二叉堆数组 | 单锁 | 无界 | 任务有优先级排序 |
DelayQueue | 优先队列 | 单锁 | 无界 | 定时任务存储 |
3.2 ArrayBlockingQueue vs LinkedBlockingQueue 性能差异
// 压测对比结论(仅供参考)
// ArrayBlockingQueue:入队出队共用一把锁,并发竞争激烈时性能下降
// LinkedBlockingQueue:入队和出队各用一把锁,吞吐量更高,但内存占用更大
// 选择建议:
// - 内存敏感、容量可预估 → ArrayBlockingQueue
// - 追求高吞吐、内存较充裕 → LinkedBlockingQueue(需指定容量)3.3 SynchronousQueue 的两种模式
// 公平模式:TransferQueue(FIFO)
new SynchronousQueue<>(true);
// 非公平模式:TransferStack(LIFO)——默认
new SynchronousQueue<>();SynchronousQueue 内部没有容量,每个插入操作必须等待另一个线程的移除操作。CachedThreadPool 使用它,使得任务立即交给线程执行,不积压。
四、拒绝策略详解与自定义实战
4.1 JDK 内置四种策略源码分析
// AbortPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " + e.toString());
}
// CallerRunsPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run(); // 直接在调用者线程执行
}
}
// DiscardPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// do nothing
}
// DiscardOldestPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // 丢弃队头
e.execute(r); // 重新尝试提交
}
}4.2 自定义拒绝策略实战(带日志与监控)
public class MonitoredCallerRunsPolicy implements RejectedExecutionHandler {
private static final Logger log = LoggerFactory.getLogger(MonitoredCallerRunsPolicy.class);
private final MeterRegistry meterRegistry;
private final Counter rejectCounter;
public MonitoredCallerRunsPolicy(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.rejectCounter = Counter.builder("thread.pool.reject")
.description("线程池拒绝任务计数")
.register(meterRegistry);
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
rejectCounter.increment();
log.warn("线程池 {} 任务被拒绝,当前线程数: {}/{}, 队列大小: {}/{},降级为调用者执行",
e.toString(),
e.getPoolSize(), e.getMaximumPoolSize(),
e.getQueue().size(), e.getQueue().remainingCapacity() + e.getQueue().size());
if (!e.isShutdown()) {
// 在调用者线程中执行,并记录耗时(可选)
long start = System.nanoTime();
try {
r.run();
} finally {
log.debug("降级任务执行耗时: {}ms",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
}
}
}4.3 其他常见自定义策略
// 1. 阻塞等待策略(当队列满时,让提交线程阻塞一小段时间再尝试)
public class BlockingWaitPolicy implements RejectedExecutionHandler {
private final long maxWaitMs;
public BlockingWaitPolicy(long maxWaitMs) { this.maxWaitMs = maxWaitMs; }
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
try {
if (!e.getQueue().offer(r, maxWaitMs, TimeUnit.MILLISECONDS)) {
throw new RejectedExecutionException("等待超时后仍无法提交");
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("等待期间被中断", ex);
}
}
}
}
// 2. 持久化到数据库/消息队列的策略
public class PersistencePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 将任务序列化存储到 DB 或 MQ,后续补偿
TaskStorage.save((Serializable) r);
log.info("任务被拒绝,已持久化待补偿");
}
}五、线程工厂定制与线程命名规范
5.1 自定义 ThreadFactory 模板
public class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final boolean daemon;
private final int priority;
private final Thread.UncaughtExceptionHandler exceptionHandler;
public NamedThreadFactory(String poolName) {
this(poolName, false, Thread.NORM_PRIORITY);
}
public NamedThreadFactory(String poolName, boolean daemon, int priority) {
this.namePrefix = poolName + "-thread-";
this.daemon = daemon;
this.priority = priority;
this.exceptionHandler = (t, e) ->
log.error("线程 {} 发生未捕获异常", t.getName(), e);
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(priority);
t.setUncaughtExceptionHandler(exceptionHandler);
return t;
}
}5.2 使用示例
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 20, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new NamedThreadFactory("order-processor", false, Thread.NORM_PRIORITY),
new ThreadPoolExecutor.CallerRunsPolicy()
);5.3 守护线程与用户线程的区别
| 类型 | 行为 | 适用场景 |
|---|---|---|
| 用户线程(默认) | JVM 等待所有用户线程结束后才退出 | 执行业务逻辑的线程池 |
| 守护线程 | JVM 退出时立即终止,不管是否执行完 | 后台辅助任务(如日志刷盘) |
六、线程池动态调整与运行时管理
6.1 可动态调整的参数
// 以下方法均可在运行时调用
executor.setCorePoolSize(newCoreSize);
executor.setMaximumPoolSize(newMaxSize);
executor.setKeepAliveTime(newTime, TimeUnit.SECONDS);
executor.setRejectedExecutionHandler(newHandler);
executor.setThreadFactory(newFactory); // 仅影响后续新建线程
executor.allowCoreThreadTimeOut(true); // 允许核心线程超时回收6.2 动态调整实战:基于负载的自动扩缩容
@Component
public class AdaptiveThreadPoolManager {
private final ThreadPoolExecutor executor;
private final int minCoreSize;
private final int maxCoreSize;
@Scheduled(fixedDelay = 10000)
public void adjustPoolSize() {
int queueSize = executor.getQueue().size();
int activeCount = executor.getActiveCount();
int currentCore = executor.getCorePoolSize();
// 队列堆积严重且活跃线程接近核心数 → 增加核心线程
if (queueSize > 500 && activeCount >= currentCore * 0.9) {
int newCore = Math.min(currentCore + 2, maxCoreSize);
executor.setCorePoolSize(newCore);
log.info("队列堆积,增加核心线程至 {}", newCore);
}
// 队列持续为空且核心线程数大于最小值 → 减少核心线程
if (queueSize == 0 && activeCount < currentCore * 0.3 && currentCore > minCoreSize) {
int newCore = Math.max(currentCore - 1, minCoreSize);
executor.setCorePoolSize(newCore);
log.info("队列空闲,减少核心线程至 {}", newCore);
}
}
}6.3 预热核心线程
// 提前创建所有核心线程,避免懒加载导致初期响应慢
executor.prestartAllCoreThreads();
// 或者预热指定数量
executor.prestartCoreThread();七、任务提交与结果处理进阶
7.1 execute vs submit 的区别
| 方法 | 返回值 | 异常处理 |
|---|---|---|
execute(Runnable) | void | 异常抛出到 UncaughtExceptionHandler |
submit(Runnable) | Future<?> | 异常封装在 Future.get() 中 |
submit(Callable<T>) | Future<T> | 异常封装在 Future.get() 中 |
// execute 的异常不会被吞掉,会传播到线程的 UncaughtExceptionHandler
executor.execute(() -> { throw new RuntimeException("execute异常"); });
// submit 的异常会被封装在 Future 中,必须调用 get() 才会抛出
Future<?> future = executor.submit(() -> { throw new RuntimeException("submit异常"); });
try {
future.get(); // 这里会抛出 ExecutionException
} catch (ExecutionException e) {
e.getCause(); // 原始异常
}7.2 CompletionService —— 先完成先获取
当需要处理一批任务并按照完成顺序获取结果时,使用 CompletionService 比手动遍历 Future 列表更高效。
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// 提交多个任务
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
final int taskId = i;
futures.add(completionService.submit(() -> {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
return "Task-" + taskId + " result";
}));
}
// 按完成顺序获取结果
for (int i = 0; i < 100; i++) {
Future<String> completed = completionService.take(); // 阻塞等待下一个完成的任务
String result = completed.get();
System.out.println(result);
}7.3 invokeAll 与 invokeAny
List<Callable<String>> tasks = Arrays.asList(
() -> { Thread.sleep(1000); return "A"; },
() -> { Thread.sleep(500); return "B"; }
);
// invokeAll:等待所有任务完成,返回所有 Future
List<Future<String>> allFutures = executor.invokeAll(tasks);
// invokeAny:只要有一个任务成功完成就返回其结果,其他任务取消
String firstResult = executor.invokeAny(tasks); // 返回 "B"八、线程池钩子方法与扩展点
8.1 三个可重写的钩子方法
public class MonitoredThreadPool extends ThreadPoolExecutor {
public MonitoredThreadPool(...) { super(...); }
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
// 可以在此记录任务开始时间,存入 ThreadLocal
TaskContext.start(t.getName(), r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
try {
if (t != null) {
log.error("任务执行异常", t);
metrics.recordError();
}
// 记录任务执行耗时
long duration = TaskContext.getDuration();
metrics.recordTaskDuration(duration);
} finally {
TaskContext.clear();
}
}
@Override
protected void terminated() {
super.terminated();
log.info("线程池已完全终止");
metrics.unregister();
}
}8.2 使用 ThreadLocal 传递上下文(需注意清理)
public class TaskContext {
private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>();
private static final ThreadLocal<String> TASK_NAME = new ThreadLocal<>();
public static void start(String threadName, Runnable task) {
START_TIME.set(System.nanoTime());
TASK_NAME.set(task.getClass().getSimpleName());
}
public static long getDuration() {
Long start = START_TIME.get();
return start == null ? -1 : TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}
public static void clear() {
START_TIME.remove(); // ⚠️ 必须清理,否则会导致内存泄漏
TASK_NAME.remove();
}
}为什么必须清理 ThreadLocal?
线程池中的线程是复用的,如果不在 afterExecute 中清理,ThreadLocal 中的数据会残留到下一个任务中,导致数据错乱或内存泄漏(Value 对象一直被强引用)。
九、ScheduledThreadPoolExecutor 深度解析
9.1 架构与继承关系
ThreadPoolExecutor
▲
│
ScheduledThreadPoolExecutor
│
└── 内部类 ScheduledFutureTask (实现 RunnableScheduledFuture)
└── 内部类 DelayedWorkQueue (基于堆的延迟队列)9.2 DelayedWorkQueue 实现原理
- 数据结构:二叉小顶堆,按照任务的下次执行时间排序
- 线程等待机制:Leader-Follower 模式,只有一个 Leader 线程等待堆顶任务的到期时间,其他线程无限期等待,减少不必要的唤醒
// 简化的 Leader-Follower 逻辑
public RunnableScheduledFuture<?> take() throws InterruptedException {
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
available.await(); // 无任务,等待
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) {
return finishPoll(first); // 任务到期,取出
}
first = null;
if (leader != null) {
available.await(); // 已有 Leader,当前线程等待
} else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 作为 Leader 等待
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal(); // 唤醒一个 Follower
lock.unlock();
}
}9.3 ScheduledFutureTask 关键字段
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
private long time; // 下次执行时间(纳秒)
private final long period; // 周期:正数 fixedRate,负数 fixedDelay,0 表示单次
private final long sequenceNumber; // 全局序号,用于任务排序
}9.4 scheduleAtFixedRate 与 scheduleWithFixedDelay 源码差异
// 在 run 方法中重新设置下次执行时间
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
super.run();
else if (super.runAndReset()) {
setNextRunTime(); // 关键差异点
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
if (p > 0) // scheduleAtFixedRate
time += p; // 基于上次计划时间累加,保持固定频率
else // scheduleWithFixedDelay
time = triggerTime(-p); // 基于当前完成时间 + delay
}后果对比示例:
// 假设任务执行需要 4 秒,周期/延迟设为 3 秒
// scheduleAtFixedRate (period=3 > 0)
// 任务1:0秒开始,4秒结束 → 下次执行时间 = 0+3=3秒(已过),立即执行任务2
// 任务2:3秒(实际4秒)开始...
// scheduleWithFixedDelay (period=-3)
// 任务1:0秒开始,4秒结束 → 下次执行时间 = 当前时间+3秒 = 7秒十、线程池监控与告警体系搭建
10.1 基于 Micrometer 的完整监控实现
@Component
public class ThreadPoolMetricsBinder {
private final MeterRegistry meterRegistry;
private final Map<String, ThreadPoolExecutor> pools = new ConcurrentHashMap<>();
public void register(String poolName, ThreadPoolExecutor executor) {
pools.put(poolName, executor);
// 核心指标 Gauge
Gauge.builder("thread.pool.core.size", executor, ThreadPoolExecutor::getCorePoolSize)
.tag("pool", poolName).register(meterRegistry);
Gauge.builder("thread.pool.max.size", executor, ThreadPoolExecutor::getMaximumPoolSize)
.tag("pool", poolName).register(meterRegistry);
Gauge.builder("thread.pool.active", executor, ThreadPoolExecutor::getActiveCount)
.tag("pool", poolName).register(meterRegistry);
Gauge.builder("thread.pool.pool.size", executor, ThreadPoolExecutor::getPoolSize)
.tag("pool", poolName).register(meterRegistry);
Gauge.builder("thread.pool.largest.size", executor, ThreadPoolExecutor::getLargestPoolSize)
.tag("pool", poolName).register(meterRegistry);
Gauge.builder("thread.pool.queue.size", executor, e -> e.getQueue().size())
.tag("pool", poolName).register(meterRegistry);
Gauge.builder("thread.pool.queue.remaining", executor,
e -> e.getQueue().remainingCapacity())
.tag("pool", poolName).register(meterRegistry);
Gauge.builder("thread.pool.completed", executor,
e -> e.getCompletedTaskCount())
.tag("pool", poolName).register(meterRegistry);
// 派生计算指标(队列使用率)
Gauge.builder("thread.pool.queue.usage", executor, e -> {
int size = e.getQueue().size();
int capacity = size + e.getQueue().remainingCapacity();
return capacity > 0 ? (double) size / capacity : 0;
}).tag("pool", poolName).register(meterRegistry);
}
// 定时打印监控日志(便于开发调试)
@Scheduled(fixedDelay = 30000)
public void logPoolStats() {
pools.forEach((name, pool) -> {
log.info("线程池[{}] 状态 - 核心:{}, 当前:{}, 活跃:{}, 队列:{}/{}, 完成:{}, 拒绝:{}",
name,
pool.getCorePoolSize(),
pool.getPoolSize(),
pool.getActiveCount(),
pool.getQueue().size(),
pool.getQueue().size() + pool.getQueue().remainingCapacity(),
pool.getCompletedTaskCount(),
getRejectedCount(pool) // 需自行记录
);
});
}
}10.2 拒绝计数与异常计数记录
public class InstrumentedThreadPool extends ThreadPoolExecutor {
private final Counter rejectedCounter;
private final Counter exceptionCounter;
public InstrumentedThreadPool(..., MeterRegistry registry, String poolName) {
super(...);
this.rejectedCounter = Counter.builder("thread.pool.rejected")
.tag("pool", poolName).register(registry);
this.exceptionCounter = Counter.builder("thread.pool.exception")
.tag("pool", poolName).register(registry);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
exceptionCounter.increment();
}
}
// 自定义拒绝处理器,在其内部调用 rejectedCounter.increment()
}10.3 告警规则配置(Prometheus + AlertManager 示例)
groups:
- name: thread_pool_alerts
interval: 30s
rules:
- alert: ThreadPoolQueueHighUsage
expr: thread_pool_queue_usage > 0.8
for: 5m
annotations:
summary: "线程池 {{ $labels.pool }} 队列使用率超过 80%"
- alert: ThreadPoolRejectOccurred
expr: increase(thread_pool_rejected_total[1m]) > 0
annotations:
summary: "线程池 {{ $labels.pool }} 出现任务拒绝"
- alert: ThreadPoolActiveThreadHigh
expr: thread_pool_active / thread_pool_max_size > 0.9
for: 10m
annotations:
summary: "线程池 {{ $labels.pool }} 活跃线程数持续接近上限"十一、常见陷阱与最佳实践
11.1 陷阱一:ThreadLocal 内存泄漏
// ❌ 错误:在任务中设置 ThreadLocal 但未清理
executor.submit(() -> {
ThreadLocal<HeavyObject> tl = new ThreadLocal<>();
tl.set(new HeavyObject()); // 任务结束后 tl 仍被线程持有
// 未调用 remove()
});
// ✅ 正确:使用 try-finally 确保清理
executor.submit(() -> {
ThreadLocal<HeavyObject> tl = new ThreadLocal<>();
try {
tl.set(new HeavyObject());
// 业务逻辑
} finally {
tl.remove();
}
});11.2 陷阱二:线程池被多个任务共享时的并发问题
// 线程池本身是线程安全的,但要注意:
// 如果任务间共享可变状态,必须加锁
class SharedCounter {
private int count = 0;
public void increment() { count++; } // 非原子操作
}
// ❌ 多个任务同时修改共享对象
SharedCounter counter = new SharedCounter();
for (int i = 0; i < 1000; i++) {
executor.submit(counter::increment);
}
// 结果可能小于 1000
// ✅ 使用原子类或同步
class SafeCounter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() { count.incrementAndGet(); }
}11.3 陷阱三:使用无界队列导致 OOM
// ❌ 致命错误
ExecutorService pool = Executors.newFixedThreadPool(10);
// 队列是 LinkedBlockingQueue,默认 Integer.MAX_VALUE
// 如果任务提交速度远大于处理速度,队列会无限增长直至 OOM
// ✅ 使用有界队列
new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), // 指定容量
...);11.4 陷阱四:异常被吞没
// submit 的异常如果不调用 get() 会被静默吞掉
Future<?> future = executor.submit(() -> { throw new RuntimeException(); });
// 这里不会打印任何异常日志
// 解决方案:
// 1. 始终调用 get() 处理异常
// 2. 使用 execute 代替 submit
// 3. 在 afterExecute 中捕获并记录异常(如前面所示)11.5 陷阱五:线程池关闭后仍有任务提交
executor.shutdown();
executor.execute(task); // 抛出 RejectedExecutionException11.6 最佳实践清单
| 实践项 | 具体做法 |
|---|---|
| 禁止 Executors 快捷方法 | 显式使用 ThreadPoolExecutor 构造函数 |
| 有界队列 + 合理拒绝策略 | 根据业务选择 CallerRunsPolicy 或自定义 |
| 自定义线程命名 | 使用 NamedThreadFactory |
| 监控指标全覆盖 | 集成 Micrometer,暴露关键指标 |
| 优雅关闭 | 使用两阶段关闭模板 |
| 异常处理 | 在 afterExecute 中统一记录未捕获异常 |
| 上下文传递 | 使用 TransmittableThreadLocal 或手动清理 |
| 参数动态化 | 提供配置中心调整核心参数的能力 |
十二、生产级线程池配置模板
12.1 通用业务线程池配置
@Configuration
public class ThreadPoolConfig {
@Bean("commonExecutor")
public ThreadPoolExecutor commonExecutor() {
int cpuCores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
cpuCores * 2, // corePoolSize
cpuCores * 4, // maximumPoolSize
60, TimeUnit.SECONDS, // keepAlive
new ArrayBlockingQueue<>(2000), // 有界队列
new NamedThreadFactory("common-pool", false, Thread.NORM_PRIORITY),
new ThreadPoolExecutor.CallerRunsPolicy()
);
executor.allowCoreThreadTimeOut(true); // 空闲时回收核心线程
return executor;
}
}12.2 高吞吐 I/O 密集型线程池
@Bean("ioIntensiveExecutor")
public ThreadPoolExecutor ioIntensiveExecutor() {
// 根据 I/O 等待时间调整
int coreSize = Runtime.getRuntime().availableProcessors() * 2;
int maxSize = coreSize * 4;
return new ThreadPoolExecutor(
coreSize, maxSize,
120, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5000), // 容量较大
new NamedThreadFactory("io-pool", false, Thread.NORM_PRIORITY),
new BlockingWaitPolicy(100) // 自定义阻塞等待拒绝策略
);
}12.3 定时任务专用线程池
@Bean("scheduledExecutor")
public ScheduledExecutorService scheduledExecutor() {
return new ScheduledThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
new NamedThreadFactory("scheduled-pool", false, Thread.NORM_PRIORITY),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}12.4 带有完整监控的线程池构建器
public class MonitoredThreadPoolBuilder {
public static ThreadPoolExecutor build(String poolName,
int core, int max,
int queueCapacity,
MeterRegistry registry) {
ThreadPoolExecutor executor = new InstrumentedThreadPool(
core, max, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity),
new NamedThreadFactory(poolName),
new MonitoredCallerRunsPolicy(registry),
registry,
poolName
);
// 注册到监控绑定器
return executor;
}
}附录:快速参考卡
线程池参数速查表
| 参数 | 推荐值范围 | 备注 |
|---|---|---|
| corePoolSize | CPU核数 ~ CPU核数×2 | 根据任务类型调整 |
| maximumPoolSize | corePoolSize ~ corePoolSize×4 | 不宜过大,避免上下文切换开销 |
| keepAliveTime | 30s ~ 120s | 临时线程回收时间 |
| 队列容量 | 500 ~ 10000 | 根据任务提交速率和处理速率计算 |
拒绝策略选择指南
| 场景 | 推荐策略 |
|---|---|
| 关键业务,不可丢失 | CallerRunsPolicy 或持久化策略 |
| 可丢弃的非核心任务 | DiscardPolicy |
| 新任务优先级高于旧任务 | DiscardOldestPolicy |
| 快速失败,开发调试 | AbortPolicy(默认) |
线程数计算公式
N = CPU 核心数
U = 目标 CPU 利用率 (0 < U <= 1)
W/C = 等待时间与计算时间的比值
最优线程数 = N * U * (1 + W/C)以上内容涵盖了从基础概念到源码原理、从监控告警到生产实践的完整知识体系,可作为学习和工作的全面参考资料。
评论