Java多线程与线程池
侧边栏壁纸
  • 累计撰写 36 篇文章
  • 累计收到 1 条评论

Java多线程与线程池

ASN__
2026-04-09 / 0 评论 / 9 阅读 / 正在检测是否收录...

# Java多线程与线程池


目录

  1. 线程池基础再深入
  2. ThreadPoolExecutor 源码级剖析
  3. 阻塞队列深度对比与选型
  4. 拒绝策略详解与自定义实战
  5. 线程工厂定制与线程命名规范
  6. 线程池动态调整与运行时管理
  7. 任务提交与结果处理进阶
  8. 线程池钩子方法与扩展点
  9. ScheduledThreadPoolExecutor 深度解析
  10. 线程池监控与告警体系搭建
  11. 常见陷阱与最佳实践
  12. 生产级线程池配置模板

一、线程池基础再深入

1.1 线程池的设计思想 —— 池化技术

池化技术的核心是空间换时间,预先创建资源对象,使用时直接获取,用完后归还,避免频繁创建销毁的开销。除了线程池,常见的池化技术还有:

  • 数据库连接池(HikariCP、Druid)
  • 对象池(Apache Commons Pool)
  • 内存池(Netty 的 ByteBuf)

1.2 线程池解决了什么问题?(细化版)

问题线程池解决方案
线程创建开销大线程对象复用,避免反复 new Thread().start() 的系统调用
线程数量失控通过 maximumPoolSize 和队列容量双重限制并发上限
任务堆积导致 OOM使用有界队列 + 拒绝策略,防止无限制接受任务
线程生命周期管理复杂统一提供 shutdown() / shutdownNow() 接口
任务执行情况难以追踪提供 beforeExecute / afterExecute 钩子和统计指标

1.3 线程池的替代方案比较

方案优点缺点
new Thread(runnable).start()简单直接无复用,资源浪费,难以管理
Timer支持定时单线程,异常即停,不推荐
ThreadPoolExecutor生产级标准参数较多,需理解原理
ForkJoinPool工作窃取,适合递归分治任务不适合普通任务
CompletableFuture + 自定义线程池异步编程更优雅学习曲线稍高

二、ThreadPoolExecutor 源码级剖析

2.1 核心数据结构 —— ctl 原子变量

ThreadPoolExecutor 使用一个 AtomicInteger 类型的 ctl 变量,同时存储两个信息:

  • 高 3 位:线程池运行状态(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED)
  • 低 29 位:当前工作线程数量(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;  // 29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 约 5.3 亿

// 状态常量(高3位)
private static final int RUNNING    = -1 << COUNT_BITS; // 111...
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000...
private static final int STOP       =  1 << COUNT_BITS; // 001...
private static final int TIDYING    =  2 << COUNT_BITS; // 010...
private static final int TERMINATED =  3 << COUNT_BITS; // 011...

// 拆包方法
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

状态转换图:

    RUNNING
       │ shutdown()
       ▼
   SHUTDOWN ──队列为空──▶ TIDYING ── terminated()──▶ TERMINATED
       │
       │ shutdownNow()
       ▼
     STOP ──────────────▶ TIDYING ── terminated()──▶ TERMINATED

2.2 内部类 Worker —— 工作线程的封装

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;       // 实际执行任务的线程
    Runnable firstTask;        // 构造时可能携带的首个任务
    volatile long completedTasks; // 完成任务计数
    
    Worker(Runnable firstTask) {
        setState(-1); // 初始禁止中断,直到线程启动
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    public void run() {
        runWorker(this); // 核心循环
    }
}

Worker 继承 AQS 实现了简单的非重入互斥锁,目的是:

  • 在执行任务期间,防止其他线程中断正在运行任务的 Worker(interruptIdleWorkers 只中断空闲的)
  • 锁状态表示 Worker 是否正在执行任务(state >= 1 表示忙碌)

2.3 runWorker —— 线程池运行核心循环

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 1. 检查线程池状态,必要时中断自己
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);  // 钩子:任务执行前
                Throwable thrown = null;
                try {
                    task.run();           // 执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 钩子:任务执行后
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

2.4 getTask —— 从队列获取任务(包含线程销毁逻辑)

private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // 检查是否需要减少 Worker
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        
        int wc = workerCountOf(c);
        // 是否允许核心线程超时 或 当前线程数超过核心数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        
        try {
            // 根据 timed 决定是 poll(带超时)还是 take(阻塞)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

关键点:

  • 非核心线程通过 poll 超时返回 null 后会被回收
  • 核心线程默认 take 无限阻塞,除非设置 allowCoreThreadTimeOut(true)

三、阻塞队列深度对比与选型

3.1 常用阻塞队列底层结构与特性

队列底层结构锁机制容量适用场景
ArrayBlockingQueue数组单锁(ReentrantLock)必须指定固定容量、内存占用可控
LinkedBlockingQueue单向链表两把锁(putLock/takeLock)默认 Integer.MAX_VALUE吞吐量高、无需预知容量
SynchronousQueue无存储公平模式队列/非公平栈0直接交付,CachedThreadPool
PriorityBlockingQueue二叉堆数组单锁无界任务有优先级排序
DelayQueue优先队列单锁无界定时任务存储

3.2 ArrayBlockingQueue vs LinkedBlockingQueue 性能差异

// 压测对比结论(仅供参考)
// ArrayBlockingQueue:入队出队共用一把锁,并发竞争激烈时性能下降
// LinkedBlockingQueue:入队和出队各用一把锁,吞吐量更高,但内存占用更大

// 选择建议:
// - 内存敏感、容量可预估 → ArrayBlockingQueue
// - 追求高吞吐、内存较充裕 → LinkedBlockingQueue(需指定容量)

3.3 SynchronousQueue 的两种模式

// 公平模式:TransferQueue(FIFO)
new SynchronousQueue<>(true);

// 非公平模式:TransferStack(LIFO)——默认
new SynchronousQueue<>();

SynchronousQueue 内部没有容量,每个插入操作必须等待另一个线程的移除操作。CachedThreadPool 使用它,使得任务立即交给线程执行,不积压。


四、拒绝策略详解与自定义实战

4.1 JDK 内置四种策略源码分析

// AbortPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
        " rejected from " + e.toString());
}

// CallerRunsPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run(); // 直接在调用者线程执行
    }
}

// DiscardPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // do nothing
}

// DiscardOldestPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll(); // 丢弃队头
        e.execute(r);        // 重新尝试提交
    }
}

4.2 自定义拒绝策略实战(带日志与监控)

public class MonitoredCallerRunsPolicy implements RejectedExecutionHandler {
    private static final Logger log = LoggerFactory.getLogger(MonitoredCallerRunsPolicy.class);
    private final MeterRegistry meterRegistry;
    private final Counter rejectCounter;
    
    public MonitoredCallerRunsPolicy(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.rejectCounter = Counter.builder("thread.pool.reject")
            .description("线程池拒绝任务计数")
            .register(meterRegistry);
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        rejectCounter.increment();
        log.warn("线程池 {} 任务被拒绝,当前线程数: {}/{}, 队列大小: {}/{},降级为调用者执行",
            e.toString(),
            e.getPoolSize(), e.getMaximumPoolSize(),
            e.getQueue().size(), e.getQueue().remainingCapacity() + e.getQueue().size());
        
        if (!e.isShutdown()) {
            // 在调用者线程中执行,并记录耗时(可选)
            long start = System.nanoTime();
            try {
                r.run();
            } finally {
                log.debug("降级任务执行耗时: {}ms", 
                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
            }
        }
    }
}

4.3 其他常见自定义策略

// 1. 阻塞等待策略(当队列满时,让提交线程阻塞一小段时间再尝试)
public class BlockingWaitPolicy implements RejectedExecutionHandler {
    private final long maxWaitMs;
    public BlockingWaitPolicy(long maxWaitMs) { this.maxWaitMs = maxWaitMs; }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            try {
                if (!e.getQueue().offer(r, maxWaitMs, TimeUnit.MILLISECONDS)) {
                    throw new RejectedExecutionException("等待超时后仍无法提交");
                }
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                throw new RejectedExecutionException("等待期间被中断", ex);
            }
        }
    }
}

// 2. 持久化到数据库/消息队列的策略
public class PersistencePolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 将任务序列化存储到 DB 或 MQ,后续补偿
        TaskStorage.save((Serializable) r);
        log.info("任务被拒绝,已持久化待补偿");
    }
}

五、线程工厂定制与线程命名规范

5.1 自定义 ThreadFactory 模板

public class NamedThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private final boolean daemon;
    private final int priority;
    private final Thread.UncaughtExceptionHandler exceptionHandler;
    
    public NamedThreadFactory(String poolName) {
        this(poolName, false, Thread.NORM_PRIORITY);
    }
    
    public NamedThreadFactory(String poolName, boolean daemon, int priority) {
        this.namePrefix = poolName + "-thread-";
        this.daemon = daemon;
        this.priority = priority;
        this.exceptionHandler = (t, e) -> 
            log.error("线程 {} 发生未捕获异常", t.getName(), e);
    }
    
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
        t.setDaemon(daemon);
        t.setPriority(priority);
        t.setUncaughtExceptionHandler(exceptionHandler);
        return t;
    }
}

5.2 使用示例

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10, 20, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    new NamedThreadFactory("order-processor", false, Thread.NORM_PRIORITY),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

5.3 守护线程与用户线程的区别

类型行为适用场景
用户线程(默认)JVM 等待所有用户线程结束后才退出执行业务逻辑的线程池
守护线程JVM 退出时立即终止,不管是否执行完后台辅助任务(如日志刷盘)

六、线程池动态调整与运行时管理

6.1 可动态调整的参数

// 以下方法均可在运行时调用
executor.setCorePoolSize(newCoreSize);
executor.setMaximumPoolSize(newMaxSize);
executor.setKeepAliveTime(newTime, TimeUnit.SECONDS);
executor.setRejectedExecutionHandler(newHandler);
executor.setThreadFactory(newFactory);  // 仅影响后续新建线程
executor.allowCoreThreadTimeOut(true);  // 允许核心线程超时回收

6.2 动态调整实战:基于负载的自动扩缩容

@Component
public class AdaptiveThreadPoolManager {
    private final ThreadPoolExecutor executor;
    private final int minCoreSize;
    private final int maxCoreSize;
    
    @Scheduled(fixedDelay = 10000)
    public void adjustPoolSize() {
        int queueSize = executor.getQueue().size();
        int activeCount = executor.getActiveCount();
        int currentCore = executor.getCorePoolSize();
        
        // 队列堆积严重且活跃线程接近核心数 → 增加核心线程
        if (queueSize > 500 && activeCount >= currentCore * 0.9) {
            int newCore = Math.min(currentCore + 2, maxCoreSize);
            executor.setCorePoolSize(newCore);
            log.info("队列堆积,增加核心线程至 {}", newCore);
        }
        
        // 队列持续为空且核心线程数大于最小值 → 减少核心线程
        if (queueSize == 0 && activeCount < currentCore * 0.3 && currentCore > minCoreSize) {
            int newCore = Math.max(currentCore - 1, minCoreSize);
            executor.setCorePoolSize(newCore);
            log.info("队列空闲,减少核心线程至 {}", newCore);
        }
    }
}

6.3 预热核心线程

// 提前创建所有核心线程,避免懒加载导致初期响应慢
executor.prestartAllCoreThreads();
// 或者预热指定数量
executor.prestartCoreThread();

七、任务提交与结果处理进阶

7.1 execute vs submit 的区别

方法返回值异常处理
execute(Runnable)void异常抛出到 UncaughtExceptionHandler
submit(Runnable)Future<?>异常封装在 Future.get() 中
submit(Callable<T>)Future<T>异常封装在 Future.get() 中
// execute 的异常不会被吞掉,会传播到线程的 UncaughtExceptionHandler
executor.execute(() -> { throw new RuntimeException("execute异常"); });

// submit 的异常会被封装在 Future 中,必须调用 get() 才会抛出
Future<?> future = executor.submit(() -> { throw new RuntimeException("submit异常"); });
try {
    future.get(); // 这里会抛出 ExecutionException
} catch (ExecutionException e) {
    e.getCause(); // 原始异常
}

7.2 CompletionService —— 先完成先获取

当需要处理一批任务并按照完成顺序获取结果时,使用 CompletionService 比手动遍历 Future 列表更高效。

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

// 提交多个任务
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
    final int taskId = i;
    futures.add(completionService.submit(() -> {
        Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
        return "Task-" + taskId + " result";
    }));
}

// 按完成顺序获取结果
for (int i = 0; i < 100; i++) {
    Future<String> completed = completionService.take(); // 阻塞等待下一个完成的任务
    String result = completed.get();
    System.out.println(result);
}

7.3 invokeAll 与 invokeAny

List<Callable<String>> tasks = Arrays.asList(
    () -> { Thread.sleep(1000); return "A"; },
    () -> { Thread.sleep(500); return "B"; }
);

// invokeAll:等待所有任务完成,返回所有 Future
List<Future<String>> allFutures = executor.invokeAll(tasks);

// invokeAny:只要有一个任务成功完成就返回其结果,其他任务取消
String firstResult = executor.invokeAny(tasks); // 返回 "B"

八、线程池钩子方法与扩展点

8.1 三个可重写的钩子方法

public class MonitoredThreadPool extends ThreadPoolExecutor {
    
    public MonitoredThreadPool(...) { super(...); }
    
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        // 可以在此记录任务开始时间,存入 ThreadLocal
        TaskContext.start(t.getName(), r);
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        try {
            if (t != null) {
                log.error("任务执行异常", t);
                metrics.recordError();
            }
            // 记录任务执行耗时
            long duration = TaskContext.getDuration();
            metrics.recordTaskDuration(duration);
        } finally {
            TaskContext.clear();
        }
    }
    
    @Override
    protected void terminated() {
        super.terminated();
        log.info("线程池已完全终止");
        metrics.unregister();
    }
}

8.2 使用 ThreadLocal 传递上下文(需注意清理)

public class TaskContext {
    private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>();
    private static final ThreadLocal<String> TASK_NAME = new ThreadLocal<>();
    
    public static void start(String threadName, Runnable task) {
        START_TIME.set(System.nanoTime());
        TASK_NAME.set(task.getClass().getSimpleName());
    }
    
    public static long getDuration() {
        Long start = START_TIME.get();
        return start == null ? -1 : TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    }
    
    public static void clear() {
        START_TIME.remove();  // ⚠️ 必须清理,否则会导致内存泄漏
        TASK_NAME.remove();
    }
}

为什么必须清理 ThreadLocal?
线程池中的线程是复用的,如果不在 afterExecute 中清理,ThreadLocal 中的数据会残留到下一个任务中,导致数据错乱或内存泄漏(Value 对象一直被强引用)。


九、ScheduledThreadPoolExecutor 深度解析

9.1 架构与继承关系

ThreadPoolExecutor
        ▲
        │
ScheduledThreadPoolExecutor
        │
        └── 内部类 ScheduledFutureTask (实现 RunnableScheduledFuture)
        └── 内部类 DelayedWorkQueue (基于堆的延迟队列)

9.2 DelayedWorkQueue 实现原理

  • 数据结构:二叉小顶堆,按照任务的下次执行时间排序
  • 线程等待机制:Leader-Follower 模式,只有一个 Leader 线程等待堆顶任务的到期时间,其他线程无限期等待,减少不必要的唤醒
// 简化的 Leader-Follower 逻辑
public RunnableScheduledFuture<?> take() throws InterruptedException {
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) {
                available.await(); // 无任务,等待
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0) {
                    return finishPoll(first); // 任务到期,取出
                }
                first = null;
                if (leader != null) {
                    available.await(); // 已有 Leader,当前线程等待
                } else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay); // 作为 Leader 等待
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal(); // 唤醒一个 Follower
        lock.unlock();
    }
}

9.3 ScheduledFutureTask 关键字段

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
    private long time;        // 下次执行时间(纳秒)
    private final long period; // 周期:正数 fixedRate,负数 fixedDelay,0 表示单次
    private final long sequenceNumber; // 全局序号,用于任务排序
}

9.4 scheduleAtFixedRate 与 scheduleWithFixedDelay 源码差异

// 在 run 方法中重新设置下次执行时间
public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        super.run();
    else if (super.runAndReset()) {
        setNextRunTime();   // 关键差异点
        reExecutePeriodic(outerTask);
    }
}

private void setNextRunTime() {
    long p = period;
    if (p > 0)  // scheduleAtFixedRate
        time += p;  // 基于上次计划时间累加,保持固定频率
    else        // scheduleWithFixedDelay
        time = triggerTime(-p); // 基于当前完成时间 + delay
}

后果对比示例:

// 假设任务执行需要 4 秒,周期/延迟设为 3 秒
// scheduleAtFixedRate (period=3 > 0)
// 任务1:0秒开始,4秒结束 → 下次执行时间 = 0+3=3秒(已过),立即执行任务2
// 任务2:3秒(实际4秒)开始...

// scheduleWithFixedDelay (period=-3)
// 任务1:0秒开始,4秒结束 → 下次执行时间 = 当前时间+3秒 = 7秒

十、线程池监控与告警体系搭建

10.1 基于 Micrometer 的完整监控实现

@Component
public class ThreadPoolMetricsBinder {
    
    private final MeterRegistry meterRegistry;
    private final Map<String, ThreadPoolExecutor> pools = new ConcurrentHashMap<>();
    
    public void register(String poolName, ThreadPoolExecutor executor) {
        pools.put(poolName, executor);
        
        // 核心指标 Gauge
        Gauge.builder("thread.pool.core.size", executor, ThreadPoolExecutor::getCorePoolSize)
            .tag("pool", poolName).register(meterRegistry);
        Gauge.builder("thread.pool.max.size", executor, ThreadPoolExecutor::getMaximumPoolSize)
            .tag("pool", poolName).register(meterRegistry);
        Gauge.builder("thread.pool.active", executor, ThreadPoolExecutor::getActiveCount)
            .tag("pool", poolName).register(meterRegistry);
        Gauge.builder("thread.pool.pool.size", executor, ThreadPoolExecutor::getPoolSize)
            .tag("pool", poolName).register(meterRegistry);
        Gauge.builder("thread.pool.largest.size", executor, ThreadPoolExecutor::getLargestPoolSize)
            .tag("pool", poolName).register(meterRegistry);
        Gauge.builder("thread.pool.queue.size", executor, e -> e.getQueue().size())
            .tag("pool", poolName).register(meterRegistry);
        Gauge.builder("thread.pool.queue.remaining", executor, 
                e -> e.getQueue().remainingCapacity())
            .tag("pool", poolName).register(meterRegistry);
        Gauge.builder("thread.pool.completed", executor, 
                e -> e.getCompletedTaskCount())
            .tag("pool", poolName).register(meterRegistry);
        
        // 派生计算指标(队列使用率)
        Gauge.builder("thread.pool.queue.usage", executor, e -> {
            int size = e.getQueue().size();
            int capacity = size + e.getQueue().remainingCapacity();
            return capacity > 0 ? (double) size / capacity : 0;
        }).tag("pool", poolName).register(meterRegistry);
    }
    
    // 定时打印监控日志(便于开发调试)
    @Scheduled(fixedDelay = 30000)
    public void logPoolStats() {
        pools.forEach((name, pool) -> {
            log.info("线程池[{}] 状态 - 核心:{}, 当前:{}, 活跃:{}, 队列:{}/{}, 完成:{}, 拒绝:{}",
                name,
                pool.getCorePoolSize(),
                pool.getPoolSize(),
                pool.getActiveCount(),
                pool.getQueue().size(),
                pool.getQueue().size() + pool.getQueue().remainingCapacity(),
                pool.getCompletedTaskCount(),
                getRejectedCount(pool) // 需自行记录
            );
        });
    }
}

10.2 拒绝计数与异常计数记录

public class InstrumentedThreadPool extends ThreadPoolExecutor {
    private final Counter rejectedCounter;
    private final Counter exceptionCounter;
    
    public InstrumentedThreadPool(..., MeterRegistry registry, String poolName) {
        super(...);
        this.rejectedCounter = Counter.builder("thread.pool.rejected")
            .tag("pool", poolName).register(registry);
        this.exceptionCounter = Counter.builder("thread.pool.exception")
            .tag("pool", poolName).register(registry);
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null) {
            exceptionCounter.increment();
        }
    }
    
    // 自定义拒绝处理器,在其内部调用 rejectedCounter.increment()
}

10.3 告警规则配置(Prometheus + AlertManager 示例)

groups:
  - name: thread_pool_alerts
    interval: 30s
    rules:
      - alert: ThreadPoolQueueHighUsage
        expr: thread_pool_queue_usage > 0.8
        for: 5m
        annotations:
          summary: "线程池 {{ $labels.pool }} 队列使用率超过 80%"
          
      - alert: ThreadPoolRejectOccurred
        expr: increase(thread_pool_rejected_total[1m]) > 0
        annotations:
          summary: "线程池 {{ $labels.pool }} 出现任务拒绝"
          
      - alert: ThreadPoolActiveThreadHigh
        expr: thread_pool_active / thread_pool_max_size > 0.9
        for: 10m
        annotations:
          summary: "线程池 {{ $labels.pool }} 活跃线程数持续接近上限"

十一、常见陷阱与最佳实践

11.1 陷阱一:ThreadLocal 内存泄漏

// ❌ 错误:在任务中设置 ThreadLocal 但未清理
executor.submit(() -> {
    ThreadLocal<HeavyObject> tl = new ThreadLocal<>();
    tl.set(new HeavyObject());  // 任务结束后 tl 仍被线程持有
    // 未调用 remove()
});

// ✅ 正确:使用 try-finally 确保清理
executor.submit(() -> {
    ThreadLocal<HeavyObject> tl = new ThreadLocal<>();
    try {
        tl.set(new HeavyObject());
        // 业务逻辑
    } finally {
        tl.remove();
    }
});

11.2 陷阱二:线程池被多个任务共享时的并发问题

// 线程池本身是线程安全的,但要注意:
// 如果任务间共享可变状态,必须加锁
class SharedCounter {
    private int count = 0;
    public void increment() { count++; } // 非原子操作
}

// ❌ 多个任务同时修改共享对象
SharedCounter counter = new SharedCounter();
for (int i = 0; i < 1000; i++) {
    executor.submit(counter::increment);
}
// 结果可能小于 1000

// ✅ 使用原子类或同步
class SafeCounter {
    private AtomicInteger count = new AtomicInteger(0);
    public void increment() { count.incrementAndGet(); }
}

11.3 陷阱三:使用无界队列导致 OOM

// ❌ 致命错误
ExecutorService pool = Executors.newFixedThreadPool(10);
// 队列是 LinkedBlockingQueue,默认 Integer.MAX_VALUE
// 如果任务提交速度远大于处理速度,队列会无限增长直至 OOM

// ✅ 使用有界队列
new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(1000), // 指定容量
    ...);

11.4 陷阱四:异常被吞没

// submit 的异常如果不调用 get() 会被静默吞掉
Future<?> future = executor.submit(() -> { throw new RuntimeException(); });
// 这里不会打印任何异常日志

// 解决方案:
// 1. 始终调用 get() 处理异常
// 2. 使用 execute 代替 submit
// 3. 在 afterExecute 中捕获并记录异常(如前面所示)

11.5 陷阱五:线程池关闭后仍有任务提交

executor.shutdown();
executor.execute(task); // 抛出 RejectedExecutionException

11.6 最佳实践清单

实践项具体做法
禁止 Executors 快捷方法显式使用 ThreadPoolExecutor 构造函数
有界队列 + 合理拒绝策略根据业务选择 CallerRunsPolicy 或自定义
自定义线程命名使用 NamedThreadFactory
监控指标全覆盖集成 Micrometer,暴露关键指标
优雅关闭使用两阶段关闭模板
异常处理afterExecute 中统一记录未捕获异常
上下文传递使用 TransmittableThreadLocal 或手动清理
参数动态化提供配置中心调整核心参数的能力

十二、生产级线程池配置模板

12.1 通用业务线程池配置

@Configuration
public class ThreadPoolConfig {
    
    @Bean("commonExecutor")
    public ThreadPoolExecutor commonExecutor() {
        int cpuCores = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            cpuCores * 2,                       // corePoolSize
            cpuCores * 4,                       // maximumPoolSize
            60, TimeUnit.SECONDS,               // keepAlive
            new ArrayBlockingQueue<>(2000),     // 有界队列
            new NamedThreadFactory("common-pool", false, Thread.NORM_PRIORITY),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        executor.allowCoreThreadTimeOut(true);  // 空闲时回收核心线程
        return executor;
    }
}

12.2 高吞吐 I/O 密集型线程池

@Bean("ioIntensiveExecutor")
public ThreadPoolExecutor ioIntensiveExecutor() {
    // 根据 I/O 等待时间调整
    int coreSize = Runtime.getRuntime().availableProcessors() * 2;
    int maxSize = coreSize * 4;
    return new ThreadPoolExecutor(
        coreSize, maxSize,
        120, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(5000), // 容量较大
        new NamedThreadFactory("io-pool", false, Thread.NORM_PRIORITY),
        new BlockingWaitPolicy(100)      // 自定义阻塞等待拒绝策略
    );
}

12.3 定时任务专用线程池

@Bean("scheduledExecutor")
public ScheduledExecutorService scheduledExecutor() {
    return new ScheduledThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),
        new NamedThreadFactory("scheduled-pool", false, Thread.NORM_PRIORITY),
        new ThreadPoolExecutor.CallerRunsPolicy()
    );
}

12.4 带有完整监控的线程池构建器

public class MonitoredThreadPoolBuilder {
    public static ThreadPoolExecutor build(String poolName, 
                                           int core, int max, 
                                           int queueCapacity,
                                           MeterRegistry registry) {
        ThreadPoolExecutor executor = new InstrumentedThreadPool(
            core, max, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(queueCapacity),
            new NamedThreadFactory(poolName),
            new MonitoredCallerRunsPolicy(registry),
            registry,
            poolName
        );
        // 注册到监控绑定器
        return executor;
    }
}

附录:快速参考卡

线程池参数速查表

参数推荐值范围备注
corePoolSizeCPU核数 ~ CPU核数×2根据任务类型调整
maximumPoolSizecorePoolSize ~ corePoolSize×4不宜过大,避免上下文切换开销
keepAliveTime30s ~ 120s临时线程回收时间
队列容量500 ~ 10000根据任务提交速率和处理速率计算

拒绝策略选择指南

场景推荐策略
关键业务,不可丢失CallerRunsPolicy 或持久化策略
可丢弃的非核心任务DiscardPolicy
新任务优先级高于旧任务DiscardOldestPolicy
快速失败,开发调试AbortPolicy(默认)

线程数计算公式

N = CPU 核心数
U = 目标 CPU 利用率 (0 < U <= 1)
W/C = 等待时间与计算时间的比值

最优线程数 = N * U * (1 + W/C)

以上内容涵盖了从基础概念到源码原理、从监控告警到生产实践的完整知识体系,可作为学习和工作的全面参考资料。

0

评论

博主关闭了所有页面的评论