【并发专题】线程池ThreadPoolExecutorl底层原理源码分析

不管现实多么惨不忍睹,都要持之以恒地相信,这只是黎明前短暂的黑暗而已。不要惶恐眼前的难关迈不过去,不要担心此刻的付出没有回报,别再花时间等待天降好运。真诚做人,努力做事!你想要的,岁月都会给你。【并发专题】线程池ThreadPoolExecutorl底层原理源码分析,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

前置知识

Q1:终止一个线程的方法有哪些?
答:通常有4个方法。其中前2个是使用stop()和destroy()方法终止,但是这2个都不推荐使用,因为他们是暴力终止线程,所以不保证资源被正确释放,甚至导致数据不一致。后2个则是通过设置一个线程中断标记了,自定义的中断标记(保证内存可见性),或者使用线程自有的interupte()方法。

Q2:为什么需要线程池?
答:首先线程池严格来说是一种池化技术,重在资源的重复利用。另外我们也知道,JAVA的线程实现技术是【内核线程1:1】的实现方案,这势必造成,JAVA在创建线程的时候,涉及到【用户态】【内核态】的切换,所以算是比较重型的操作。
试想一下,在web开发中,服务器需要接受并处理请求,如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。所以,我们需要一个可重复使用的线程池。线程池的优势在于:

  • 重用存在的线程,减少线程创建,消亡的开销,提高性能
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

课程内容

一、线程池

1.基本介绍

线程池严格来说是一种池化技术,重在资源重复利用,它就是一种多线程存在形式。在多线编程中,创建和销毁线程一项开销较的操作,所以线程池通过预先创建一组线程,并将任务分配给这些线程来提供高效的线程管理。
线程池通常包括一个线程队列和一个任务队列。线程队列中保存着可供复用的线程,任务队列中保存着需要执行的任务。
从上面我们可以看到,线程池有着显著的优点:降低资源消耗、提高响应速度、方便管理;可以复用线程、控制最搭并发数、管理线程等。

2.Executor接口

Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。它的继承图如下:
在这里插入图片描述

从图中我们可以看到一个很重要的接口叫做Executors,这也是我们后面学的线程池的一个核心接口。在其中定义了线程池的具体行为。接口定义如下:
在这里插入图片描述

execute(Runnable command); // 执行Ruannable类型的任务
submit(task); // 可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
shutdown(); // 表示要关闭线程池,不再接受新的任务,但是会把已经提交的任务先完成
shutdownNow(); // 停止所有正在执行的任务,不再接受新的任务,并且也不执行等待中的任务,只是将等待任务列表返回
isTerminated(); // 测试是否所有任务都执行完了(只有调用了shutdown或者shutdownNow这里才可能返回真,不然永远是false)
isShutdown(); // 测试是否该ExecutorService已被关闭

*3.线程池的重点属性

线程池中的重点属性如下:(贯穿全文!想要读透源码,一定要记住每个状态的特征。因为你在后面的源码阅读中会发现大量【重复获取状态,对状态做判断】的逻辑)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池的状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
ctl字段
  • AtomicInteger ctl:ctl字段,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分信息:【线程池的运行状态,runState】和【线程池有效线程数量,workerCount】。这里可以看到,使用了Integer类型来保存。其中,高3位保存runState低29位保存了workerCount。上面的COUNT_BITS 值就是29,所以CAPACITY的值是1左移29,很大很大,大概5亿多。

ctl相关方法:

  • runStateOf:获取运行状态
  • workerCountOf:获取活动线程数;
  • ctlOf:获取运行状态和活动线程数的值。

下面5个字段表示线程池的状态:

RUNNING字段

(1)状态说明:表示线程池出于RUNNING状态。在当前状态,可以接收新任务,以及对线程已添加的任务进行处理
(2)状态切换:线程池的初始状态就是RUNNING状态。换句话说,线程池一旦被创建就出于这个状态了,并且线程池的任务数量为0
(3)值:高3位111,值为:-3

*SHUTDOWN字段

(1)状态说明:表示线程池处于SHUTDOWN状态。在当前状态,不接收新任务,但能处理已经添加的任务(PS:非常重要的一个状态,后续源码会出现多次)
(2)状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN
(3)值:高3位000,值为:0

STOP字段

(1) 状态说明:表示线程池处在STOP状态。在当前状态,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务
(2)状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP
(3)值:高3位001,值为:1

TIDYING字段

(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(2)状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING
(3)值:高3位010,值为:2

TERMINATED字段

(1)状态说明:线程池彻底终止,就变成TERMINATED状态。
(2)状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
进入TERMINATED的条件如下:

  • 线程池不是RUNNING状态;
  • 线程池状态不是TIDYING状态或TERMINATED状态;
  • 如果线程池状态是SHUTDOWN并且workerQueue为空;
  • workerCount为0;
  • 设置TIDYING状态成功。
    在这里插入图片描述
    (3)值:高3位011,值为:3
*状态总结

这5个状态,特别是SHUTDOWN状态特别重要,一定要知道这个状态的特征。这里做一个特征总结:

  1. 只有RUNNING状态,才会接受新的任务
  2. SHUTDOWN状态不接收新任务,但是【会】处理剩余的,在阻塞队列中的【已提交】任务
  3. STOP状态不接收新任务,【也不会】处理剩余的,在阻塞队列中的【已提交】任务

另外,关于这个【已提交】任务的定义,可以认为是:完整执行了execute()方法,并且进入了阻塞等待队列中的任务。意思是,只要还在execute()方法里面,就算是已经执行了入队方法入队成功了,都不算是【已提交】,而是【新任务】。

二、线程池的创建及参数详解

ThreadPoolExecutor的构造方法有4个,但是总的来说算是1个,其他3个都是某些参数采用了默认策略而已。所以这边就拿最全面的那个来给大家介绍一下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

参数解读:

  • corePoolSize:线程池中的核心线程数。当提交一个任务时,如果当前线程数小于corePoolSize,不管线程池中是否有空闲的线程,还是会创建一个新的线程来执行当前任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程
  • maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
  • keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
  • unit:keepAliveTime的单位;
  • workQueue:用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:ArrayBlockingQueue、LinkedBlockingQuene、SynchronousQuene、PriorityBlockingQueue(更多阻塞队列详情见我的另一篇博文《线程池ThreadPoolExecutorl底层原理源码分析》)
  • threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
    在这里插入图片描述
  • handler:线程池的拒绝策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
    • AbortPolicy:直接抛出异常,默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;

上面的4种策略都是ThreadPoolExecutor的内部类。当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义拒绝策略,如记录日志或持久化存储不能处理的任务。

三、线程池的基本使用

ThreadPoolExecutor提供了两种执行任务的方法:

void execute(Runnable command); // 执行一个任务,没有返回值
Future<?> submit(Runnable task); // 执行一个任务,有返回值

本质上submit中最终还是调用的execute()方法,只不过会将任务包装成一个RunnableFuture返回一个Future对象,用来获取任务执行结果:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

下面是线程池的基本使用示例:

public class ThreadPoolExecutorTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                10,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>()
        );

        // 演示无返回值
        Runnable noReturn = new Runnable() {
            @Override
            public void run() {
                System.out.println("这里经过周密的计算返回:1");
            }
        };
        threadPoolExecutor.execute(noReturn);

        // 演示有返回值
        Callable haveReturn = ()->{
            return "这里经过周密的计算返回:2";
        };
        Future submit = threadPoolExecutor.submit(haveReturn);
        System.out.println(submit.get());
    }

//    系统输出:
//    这里经过周密的计算返回:1
//    这里经过周密的计算返回:2
}

四、线程池原理

一个任务进入线程线程池添加任务流程如下:
在这里插入图片描述

线程池的核心运转流程如上图所示:

  1. 当有新的任务task进来的时候,即执行:execute()方法时
  2. 判断当前线程池中的核心线程数是否小于corePoolSize,小于则新建线程执行当前提交的任务task(不管线程池内的其他核心线程是否空闲,只要小于成立,一样新建线程执行任务);否则尝试将任务task添加到阻塞队列中
  3. 如果阻塞队列没满,则入队;如果阻塞队列已经满了,则进行下一步判断
  4. 判断线程池中的线程数是否小于最大线程数maximumPoolSIze,小于则新建线程执行当前提交的任务task;否则就采取拒绝策略

五、核心源码解读

1.execute()

方法介绍

执行任务。在这里,会做决策,是添加线程执行任务,还是添加到阻塞队列中,或者是拒绝

源码

PS:注意每一行的注释

public void execute(Runnable command) {
    
    if (command == null)
        throw new NullPointerException();
    
    // 获取ctl
    // ctl初始值是ctlOf(RUNNING, 0),表示线程池处于运行中,工作线程数为0
    int c = ctl.get();
    
    // 工作线程数小于corePoolSize,则添加工作线程,并把command作为该线程要执行的任务
    if (workerCountOf(c) < corePoolSize) {
        // true表示添加的是核心工作线程,具体一点就是,在addWorker内部会判断当前工作线程数是不是超过了corePoolSize
        // 如果超过了则会添加失败,addWorker返回false,表示不能直接开启新的线程来执行任务,而是应该先入队
        if (addWorker(command, true))
            return;
        
        // 如果添加核心工作线程失败,那就重新获取ctl,可能是线程池状态被其他线程修改了
        // 也可能是其他线程也在向线程池提交任务,导致核心工作线程已经超过了corePoolSize
        c = ctl.get();
    }
    
    // 线程池状态是否还是RUNNING,如果是就把任务添加到阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        
        // 在任务入队时,线程池的状态可能也会发生改变
        // 再次检查线程池的状态,如果线程池不是RUNNING了,那就不能再接受任务了,就得把任务从队列中移除,并进行拒绝策略
        
        // 如果线程池的状态没有发生改变,仍然是RUNNING,那就不需要把任务从队列中移除掉
        // 不过,为了确保刚刚入队的任务有线程会去处理它,需要判断一下工作线程数,如果为0,那就添加一个非核心的工作线程
        // 添加的这个线程没有自己的任务,目的就是从队列中获取任务来执行
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果线程池状态不是RUNNING,或者线程池状态是RUNNING但是队列满了,则去添加一个非核心工作线程
    // 实际上,addWorker中会判断线程池状态如果不是RUNNING,是不会添加工作线程的
    // false表示非核心工作线程,作用是,在addWorker内部会判断当前工作线程数已经超过了maximumPoolSize,如果超过了则会添加不成功,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
方法解读

整体来说方法是比较简单的,但是通过阅读代码,我希望大家能对【SHUTDOWN状态会拒绝新任务,处理已提交的剩余任务】有个更清晰的认知。那就是:
这个【已提交】表示在已经在队列里面的任务,不包括【当前任务】,【当前任务】会被当作【新任务】拒绝掉!你们看:
在这里插入图片描述
只要【当前任务】还在execute()方法里面,那么即使是入队了,但因为线程池状态不是RUNNING,一样会出队。

源码流程图如下:

在这里插入图片描述

2.addWorker()

方法介绍

添加工作线程

源码
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // 当线程状态 >= SHUTDOWN的时候,只有SHUTDOWN状态,且存在【已提交】任务
        // 才需要创建线程。这里采用取反(更多注释,见下面)
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        
        // 判断工作线程数是否超过了限制
        // 如果超过限制了,则return false
        // 如果没有超过限制,则修改ctl,增加工作线程数,cas成功则退出外层retry循环,去创建新的工作线程
        // 如果cas失败,则表示有其他线程也在提交任务,也在增加工作线程数,此时重新获取ctl
        // 如果发现线程池的状态发生了变化,则继续回到retry,重新判断线程池的状态是不是SHUTDOWN或STOP
        // 如果状态没有变化,则继续利用cas来增加工作线程数,直到cas成功
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    
    // ctl修改成功,也就是工作线程数+1成功
    // 接下来就要开启一个新的工作线程了
    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Worker实现了Runnable接口
        // 在构造一个Worker对象时,就会利用ThreadFactory新建一个线程
        // Worker对象有两个属性:
        // Runnable firstTask:表示Worker待执行的第一个任务,第二个任务会从阻塞队列中获取
        // Thread thread:表示Worker对应的线程,就是这个线程来获取队列中的任务并执行的
        w = new Worker(firstTask);
        
        // 拿出线程对象,还没有start
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                
                // 如果线程池的状态是RUNNING
                // 或者线程池的状态变成了SHUTDOWN,但是当前线程没有自己的第一个任务,那就表示当前调用addWorker方法是为了从队列中获取任务来执行
                // 正常情况下线程池的状态如果是SHUTDOWN,是不能创建新的工作线程的,但是队列中如果有任务,那就是上面说的特例情况
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    
                    // 如果Worker对象对应的线程已经在运行了,那就有问题,直接抛异常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    
                    // workers用来记录当前线程池中工作线程,调用线程池的shutdown方法时会遍历worker对象中断对应线程
                    workers.add(w);
                    
                    // largestPoolSize用来跟踪线程池在运行过程中工作线程数的峰值
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            
            // 运行线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 在上述过程中如果抛了异常,需要从works中移除所添加的work,并且还要修改ctl,工作线程数-1,表示新建工作线程失败
        if (! workerStarted)
            addWorkerFailed(w);
    }
    
    // 最后表示添加工作线程成功
    return workerStarted;
}
方法解读

addWorkder方法为核心方法之一。在前面的学习中,我们知道,添加线程通常有下面2种情况:

  • 添加核心线程:判断当前线程数是否小于corePoolSize ,是则添加;否则尝试加入等待阻塞队列
  • 添加非核心线程数:判断当前线程数是否小于maximumPoolSize ,是则添加;否则采取拒绝策略

另外,前面说过,其实线程池状态也会影响线程的添加,不知道大家还记得否。比如:SHUTDOWN状态,是需要保证有线程将剩余任务执行完的;STOP、TIDYING、TERMINATED状态等,就不需要存在线程了。显然,这些因素会在当前方法的考虑之中。所以你会在源码中看到,不少对线程状态的判断。

  • 特别解读(1)
    addWorker()方法语义上,整体分为2个步骤。步骤1是判断是否要添加线程;步骤2是添加并启动线程

  • 特别解读(2)
    在这里插入图片描述
    很多人对第一个for(;;)里面的这个判断可能会有点难理解。大家还记得吧,什么情况下,线程池状态大于SHUTDOWN还可能需要创建线程呢?其实只有1种情况,那就是有【已提交】任务。这里难就难在firstTask == null,为什么要这么判断呢?
    首先firstTask其实是在添加线程的时候,第一个要执行的任务。所以,我们反向思考一下,那什么时候这个firstTask != null,是有值的,如下:
    在这里插入图片描述
    只有2处,而且其实都在上面介绍过的execute()里面。
    在这里插入图片描述
    我们在上面的【状态总结】中说过,SHUTDOWN状态不会再接受新任务了,并且也对【已提交】任务和【新任务】做了说明。所以,这2个还在execute()方法里面的【新任务】,firstTask != null显然不是我们要的。
    总结一下这段代码代码(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()的含义:线程池是SHUTDOWN状态,但没有新任务,且仍然有【已提交】任务,这个情况不就是需要线程执行任务吗?然后这里取了反,就是不需要创建线程。(PS:后面这种判断还会持续出现)

源码流程图

在这里插入图片描述

3.runWorker()

方法介绍

线程执行的时候实际运行的方法,在Worker里面的run()体内被调用
在这里插入图片描述

源码
final void runWorker(Worker w) {
    // 就是当前工作线程
    Thread wt = Thread.currentThread();
    
    // 把Worker要执行的第一个任务拿出来
    Runnable task = w.firstTask;
    w.firstTask = null;
    
    // 这个地方,后面单独分析中断的时候来分析
    w.unlock(); // allow interrupts
    
    boolean completedAbruptly = true;
    try {
        
        // 判断当前线程是否有自己的第一个任务,如果没有就从阻塞队列中获取任务
        // 如果阻塞队列中也没有任务,那线程就会阻塞在这里
        // 但是并不会一直阻塞,在getTask方法中,会根据我们所设置的keepAliveTime来设置阻塞时间
        // 如果当前线程去阻塞队列中获取任务时,等了keepAliveTime时间,还没有获取到任务,则getTask方法返回null,相当于退出循环
        // 当然并不是所有线程都会有这个超时判断,主要还得看allowCoreThreadTimeOut属性和当前的工作线程数等等,后面单独分析
        // 目前,我们只需要知道工作线程在执行getTask()方法时,可能能直接拿到任务,也可能阻塞,也可能阻塞超时最终返回null
        while (task != null || (task = getTask()) != null) {
            // 只要拿到了任务,就要去执行任务
            
            // Work先加锁,跟shutdown方法有关,先忽略,后面会分析
            w.lock();
            
            
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            
            // 下面这个if,最好把整篇文章都看完之后再来看这个if的逻辑
            
            // 工作线程在运行过程中
            // 如果发现线程池的状态变成了STOP,正常来说当前工作线程的中断标记应该为true,如果发现中断标记不为true,则需要中断自己
            
            // 如果线程池的状态不是STOP,要么是RUNNING,要么是SHUTDOWN
            // 但是如果发现中断标记为true,那是不对的,因为线程池状态不是STOP,工作线程仍然是要正常工作的,不能中断掉
            // 就算是SHUTDOWN,也要等任务都执行完之后,线程才结束,而目前线程还在执行任务的过程中,不能中断
            // 所以需要重置线程的中断标记,不过interrupted方法会自动清空中断标记
            // 清空为中断标记后,再次判断一下线程池的状态,如果又变成了STOP,那就仍然中断自己
            
            // 中断了自己后,会把当前任务执行完,在下一次循环调用getTask()方法时,从阻塞队列获取任务时,阻塞队列会负责判断当前线程的中断标记
            // 如果发现中断标记为true,那就会抛出异常,最终退出while循环,线程执行结束
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            
            
            try {
                // 空方法,给自定义线程池来实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    // 注意执行任务时可能会抛异常,如果抛了异常会先依次执行三个finally,从而导致completedAbruptly = false这行代码没有执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 空方法,给自定义线程池来实现
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++; // 跟踪当前Work总共执行了多少了任务
                w.unlock();
            }
        }
        
        // 正常退出了While循环
        // 如果是执行任务的时候抛了异常,虽然也退出了循环,但是是不会执行这行代码的,只会直接进去下面的finally块中
        
        // 所以,要么是线程从队列中获取任务时阻塞超时了从而退出了循环会进入到这里
        // 要么是线程在阻塞的过程中被中断了,在getTask()方法中会处理中断的情况,如果被中断了,那么getTask()方法会返回null,从而退出循环
        // completedAbruptly=false,表示线程正常退出
        completedAbruptly = false;
    } finally {
        // 因为当前线程退出了循环,如果不做某些处理,那么这个线程就运行结束了,就是上文说的回收(自然消亡)掉了,线程自己运行完了也就结束了
        // 但是如果是由于执行任务的时候抛了异常,那么这个线程不应该直接结束,而应该继续从队列中获取下一个任务
        // 可是代码都执行到这里了,该怎么继续回到while循环呢,怎么实现这个效果呢?
        // 当然,如果是由于线程被中断了,或者线程阻塞超时了,那就应该正常的运行结束
        // 只不过有一些善后工作要处理,比如修改ctl,工作线程数-1
        processWorkerExit(w, completedAbruptly);
    }
}
方法解读

runworker()方法也是线程池里面的核心方法之一!想要阅读好这个方法,其实也不难,如果你还记得我前面说的【线程池状态特征】的话。所以为了实现不同状态的线程特征,这里势必会考虑如下问题:

  1. 正常运行的线程执行当前任务之后,还需要处理阻塞等待队列中的任务
  2. STOP状态会中断当前正在执行的任务
  3. 需要响应线程中断信号
  4. 执行异常导致线程消亡后续怎么处理

整体来说方法还是比较简单的,看不懂也别太纠结一些繁琐的细枝末节了。但是通过阅读代码,我希望大家起码要了解到以下这些基本流程:

  1. 新线程启动开始运行的时候,会先判断绑定当前线程的firstTask是不是空,不是空则执行任务,否则就下一步
  2. 通过调用getTask()从阻塞队列中拿新的任务,拿成功则执行任务,不成功,则阻塞等待

4.getTask()

方法介绍

从阻塞队列中获取任务,会产生阻塞

源码
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // Check if queue empty only if necessary.
        // 如果线程池状态是STOP,表示当前线程不需要处理任务了,那就修改ctl工作线程数-1
        // 如果线程池状态是SHUTDOWN,但是阻塞队列中为空,表示当前任务没有任务要处理了,那就修改ctl工作线程数-1
        // return null表示当前线程无需处理任务,线程退出
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        
        // 当前工作线程数
        int wc = workerCountOf(c);
        
        // Are workers subject to culling?
        // 用来判断当前线程是无限阻塞还是超时阻塞,如果一个线程超时阻塞,那么一旦超时了,那么这个线程最终就会退出
        // 如果是无限阻塞,那除非被中断了,不然这个线程就一直等着获取队列中的任务
    
        // allowCoreThreadTimeOut为true,表示线程池中的所有线程都可以被回收掉,则当前线程应该直接使用超时阻塞,一旦超时就回收
        // allowCoreThreadTimeOut为false,则要看当前工作线程数是否超过了corePoolSize,如果超过了,则表示超过部分的线程要用超时阻塞,一旦超时就回收
        
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 如果工作线程数超过了工作线程的最大限制或者线程超时了,则要修改ctl,工作线程数减1,并且return null
        // return null就会导致外层的while循环退出,从而导致线程直接运行结束
        // 直播课程里会细讲timed && timedOut
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        
        
        try {
            // 要么超时阻塞,要么无限阻塞
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            
            // 表示没有超时,在阻塞期间获取到了任务
            if (r != null)
                return r;
            
            // 超时了,重新进入循环,上面的代码会判断出来当前线程阻塞超时了,最后return null,线程会运行结束
            timedOut = true;
        } catch (InterruptedException retry) {
            // 从阻塞队列获取任务时,被中断了,也会再次进入循环,此时并不是超时,但是重新进入循环后,会判断线程池的状态
            // 如果线程池的状态变成了STOP或者SHUTDOWN,最终也会return null,线程会运行结束
            // 但是如果线程池的状态仍然是RUNNING,那当前线程会继续从队列中去获取任务,表示忽略了本次中断
            // 只有通过调用线程池的shutdown方法或shutdownNow方法才能真正中断线程池中的线程
            timedOut = false;
        }
    }
}
方法解读

同样的,还是要考虑SHUTDOWN状态,然后做处理。如果大家看过前面的思路,这个方法理解起来不会有太大的难度。

5.processWorkerExit()

方法介绍

处理线程退出的过程(首尾工作)

源码
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    
    // 如果completedAbruptly为true,表示是执行任务的时候抛了异常,那就修改ctl,工作线程数-1
    // 如果completedAbruptly为false,表示是线程阻塞超时了或者被中断了,实际上也要修改ctl,工作线程数-1
    // 只不过在getTask方法中已经做过了,这里就不用再做一次了
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 当前Work要运行结束了,将完成的任务数累加到线程池上
        completedTaskCount += w.completedTasks;
        
        // 将当前Work对象从workers中移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
    // 因为当前是处理线程退出流程中,所以要尝试去修改线程池的状态为TINDYING
    tryTerminate();
    
    
    int c = ctl.get();
    // 如果线程池的状态为RUNNING或者SHUTDOWN,则可能要替补一个线程
    if (runStateLessThan(c, STOP)) {
        
        // completedAbruptly为false,表示线程是正常要退出了,则看是否需要保留线程
        if (!completedAbruptly) {
            
            // 如果allowCoreThreadTimeOut为true,但是阻塞队列中还有任务,那就至少得保留一个工作线程来处理阻塞队列中的任务
            // 如果allowCoreThreadTimeOut为false,那min就是corePoolSize,表示至少得保留corePoolSize个工作线程活着
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            
            // 如果当前工作线程数大于等于min,则表示符合所需要保留的最小线程数,那就直接return,不会调用下面的addWorker方法新开一个工作线程了
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        
        // 如果线程池的状态为RUNNING或者SHUTDOWN
        // 如果completedAbruptly为true,表示当前线程是执行任务时抛了异常,那就得新开一个工作线程
        // 如果completedAbruptly为false,但是不符合所需要保留的最小线程数,那也得新开一个工作线程
        addWorker(null, false);
    }
}
方法解读

总结一下,某个工作线程正常情况下会不停的循环从阻塞队列中获取任务来执行,正常情况下就是通过阻塞来保证线程永远活着,但是会有一些特殊情况:

  1. 如果线程被中断了,那就会退出循环,然后做一些善后处理,比如ctl中的工作线程数-1,然后自己运行结束
  2. 如果线程阻塞超时了,那也会退出循环,此时就需要判断线程池中的当前工作线程够不够,比如是否有corePoolSize个工作线程,如果不够就需要新开一个线程,然后当前线程自己运行结束,这种看上去效率比较低,但是也没办法,当然如果当前工作线程数足够,那就正常,自己正常的运行结束即可
  3. 如果线程是在执行任务的时候抛了移除,从而退出循环,那就直接新开一个线程作为替补,当然前提是线程池的状态是RUNNING

6.shutdown()

方法介绍

调用线程池的shutdown方法,表示要关闭线程池,不接受新任务,但是要把阻塞队列中剩余的任务执行完。线程池状态由:RUNNING变成SHUTDOWN

源码
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改ctl,将线程池状态改为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断工作线程
        interruptIdleWorkers();
        // 空方法,给子类扩展使用
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 调用terminated方法
    tryTerminate();
}
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}


private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 遍历所有正在工作的线程,要么在执行任务,要么在阻塞等待任务
        for (Worker w : workers) {
            Thread t = w.thread;
            
            // 如果线程没有被中断,并且能够拿到锁,就中断线程
            // Worker在执行任务时会先加锁,执行完任务之后会释放锁
            // 所以只要这里拿到了锁,就表示线程空出来了,可以中断了
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
方法解读

线程池处于SHUTDOWN状态,不接收新任务,但能处理【已提交】的任务(在阻塞队列中的任务)
线程执行了SHUTDOWN之后有两件事情需要干:

  1. 修改线程池状态
  2. 中断工作中的线程

也许大家会对第2件事情【终端工作的线程】有点疑问,这里指的是:

  1. 对于在阻塞等待任务的线程,直接中断即可,
  2. 对于正在执行任务的线程,其实只要等它们把任务执行完,就可以中断了,因为此时线程池不能接受新任务,所以正在执行的任务就是当前线程的最后一个任务

读到这里你是不是有个疑问?不是说还要处理阻塞队列中的剩余任务吗?这段源码中没有体现啊。确实没体现,因为它在前面的processWorkerExit()里面体现了:
在这里插入图片描述

7.shutdownNow()

方法介绍

调用线程池的shutdownNow方法,表示要关闭线程池,不接受新任务,也不执行阻塞队列中剩余的任务。线程池状态由:RUNNING变成STOP

源码
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改ctl,将线程池状态改为STOP
        advanceRunState(STOP);
        // 中断工作线程
        interruptWorkers();
        // 返回阻塞队列中剩余的任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    
    // 调用terminated方法
    tryTerminate();
    return tasks;
}
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 中断所有工作线程,不管有没有在执行任务
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}


void interruptIfStarted() {
    Thread t;
    
    // 只要线程没有被中断,那就中断线程,中断的线程虽然也会进入processWorkerExit方法,但是该方法中判断了线程池的状态
    // 线程池状态为STOP的情况下,不会再开启新的工作线程了
    // 这里getState>-0表示,一个工作线程在创建好,但是还没运行时,这时state为-1,可以看看Worker的构造方法就知道了
    // 表示一个工作线程还没开始运行,不能被中断,就算中断也没意义,都还没运行
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}
方法解读

线程池处在STOP状态,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务,返回阻塞队列中的任务内容

8.mainLock

在上述源码中,发现很多地方都会用到mainLock,它是线程池中的一把全局锁,主要是用来控制workers集合的并发安全,因为如果没有这把全局锁,就有可能多个线程公用同一个线程池对象,如果一个线程在向线程池提交任务,一个线程在shutdown线程池,如果不做并发控制,那就有可能线程池shutdown了,但是还有工作线程没有被中断,如果1个线程在shutdown,99个线程在提交任务,那么最终就可能导致线程池关闭了,但是线程池中的很多线程都没有停止,仍然在运行,这肯定是不行,所以需要这把全局锁来对workers集合的操作进行并发安全控制。

学习总结

  1. 深入学习了线程池从创建到停止的过程
  2. 深入学习了线程池核心源码

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/180530.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!