Java 多线程并发【16】ThreadPoolExcutor

前置知识点

在了解 ThreadPoolExcutor 之前,我们需要回顾一下相关的知识点,他们是线程池底层原理的基础,包括:

  • Executor 和 Runnable
  • BlockingQueue,生产者 – 消费者模式。

Executor 和 Runnable

在介绍 Executor 的章节提到了 Executor 中声明了任务执行能力,Runnable 则作为任务的抽象,可以被 Executor 执行。Executor 的拓展是 ExecutorService ,它声明了三组方法,代表三种能力:

  • Submit 系列方法,表示提交任务。
  • Invoke 系列方法,表示执行任务。
  • Shotdown 系列方法,表示关闭执行器。

此时的 ExecutorService 还是一个接口,它需要对一些方法进行默认实现,于是就有了抽象类 AbstractExecutorService ,真正的实现可以继承自 AbstractExecutorService ,从而更加方便的实现一个执行器。

AbstractExecutorService 通过 newTaskFor 返回的 RunnableFuture 对象实现了 submit 、invokeAny 和 invokeAll 方法。RunnableFuture 的 默认实现是 FutureTask 。

而 AbstractExecutorService 的实现类,就是 ThreadPoolExcutor 。

BlockingQueue

生产者 – 消费者模式:生产者消费者模式又称为半同步/半异步模式。分为同步层、队列层、异步层三层。同步层的主线程处理工作任务并存入工作队列,工作线程从工作队列取出任务进行处理,如果工作队列为空,则取不到任务的工作线程进入挂起状态。当工作线程都不空闲且工作队列塞满时,异步层产生的任务无法进入等待队列,造成阻塞。

BlockingQueue 是 Java.util.concurrent 中用来定义生产者消费者模式的数据结构能力的接口。它的实现包括 ArrayBlockingQueue 、LinkedBlockingQueue 等。

ThreadPoolExcutor

ThreadPoolExcutor 是一个 ExecutorService ,它使用一组线程来执行接收到的任务。通常使用 Executors 工厂方法进行配置。

线程池解决了两个问题:

  1. 由于减少了每个任务的调用开销,它们通常在执行大量异步任务时提供改进的性能,并且它们提供了一种限制和管理资源的方法,包括执行一组线程时消耗的线程、任务。每个 ThreadPoolExecutor 还维护一些基本的统计信息,例如完成任务的数量。
  2. 为了在广泛的上下文中有用,这个类提供了许多可调整的参数和可扩展性挂钩。但是,建议程序员使用更方便的 Executors 工厂方法 Executors.newCachedThreadPool(无界线程池,具有自动线程回收功能)、Executors.newFixedThreadPool(固定大小线程池)和 Executors.newSingleThreadExecutor(单个后台线程),这些方法预先配置了最常见的使用场景。也可以自定义配置线程池。

手动配置自定义的线程池需要关注一些参数:

设置预启动线程

默认情况下,即使是核心线程也只会在新任务到达时才创建、启动。但这可以使用方法 prestartCoreThread 或 prestartAllCoreThreads 实现预创建线程,并等待任务。

    // 启动一个核心线程,使其空闲等待工作。 这会覆盖仅在执行新任务时启动核心线程的默认策略。 如果所有核心线程都已启动,此方法将返回 false
    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }
    // 启动所有核心线程,使它们空闲等待工作。 这会覆盖仅在执行新任务时启动核心线程的默认策略,返回的是已启动的线程数量。
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

Core and maximum pool sizes

ThreadPoolExecutor 会根据 corePoolSize 和 maximumPoolSize 设置的界限自动调整池大小。

corePoolSize :表示线程池中用来工作的核心线程数量。

maximumPoolSize :表示线程池中最大的线程数量。

getPoolSize() :方法表示当线程池的实际线程数量。

Current Size < corePoolSize

当在方法 execute(Runnable) 中提交了一个新任务,并且运行的工作线程数量少于 corePoolSize 时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。

corePoolSize < Current Size < maximumPoolSize

如果运行的线程数多于 corePoolSize 但少于 maximumPoolSize,则仅当队列已满时才会创建新线程。

为 corePoolSize 和 maximumPoolSize 设置不同的值,可以得到不同类型的线程池:

  • 通过将 corePoolSize 和 maximumPoolSize 设置为相同,您可以创建一个固定大小的线程池。
  • 通过将 maximumPoolSize 设置为基本上无界的值,例如 Integer.MAX_VALUE,您允许池容纳任意数量的并发任务。

corePoolSize 和 maximumPoolSize 不仅可以在构造方法中设置,也可通过 setCorePoolSizesetMaximumPoolSize 方法动态更改。

创建新线程

ThreadPoolExcutor 内部使用 ThreadFactory 创建新线程。 如果没有另外指定,则使用 Executors.defaultThreadFactory,它创建的线程都在同一个 ThreadGroup 中,并且具有相同的 NORM_PRIORITY 优先级和非守护进程状态。

通过提供不同的 ThreadFactory,您可以更改线程的名称、线程组、优先级、守护程序状态等。如果 ThreadFactory 在从 newThread 返回 null 时未能创建线程,则执行程序将继续,但可能无法执行任何任务。

keepAliveTime

如果线程池当前有超过 corePoolSize 数量的线程,多余的线程如果空闲时间超过 keepAliveTime,将会被终止。

这个参数提供了一种在线程池没有被积极使用时减少资源消耗的方法。如果线程池稍后需要额外的线程,将会重新创建新的线程。

也可以使用方法 setKeepAliveTime(long, TimeUnit) 动态更改此参数。使用 TimeUnit.NANOSECONDS 的值可以有效地禁止空闲线程在关闭之前终止。默认情况下,keep-alive 策略仅适用于超过 corePoolSize 线程的情况。但是方法 allowCoreThreadTimeOut(boolean) 也可用于将此超时策略应用于核心线程,只要 keepAliveTime 值不为零即可。

任务队列

任何类型的 BlockingQueue 实现都可以用来传输和保存提交的任务。 队列和线程数量有关:

  • 运行的线程数量 < corePoolSize,则 Executor 总是创建新线程而不是添加到队列中。
  • corePoolSize < 运行的线程数量 < maximumPoolSize ,Executor 将任务添加到队列中,而不是创建新的线程去执行人我。
  • corePoolSize < 运行的线程数量 < maximumPoolSize 且任务队列已满时,有新的任务被提交,此时会创建新的工作线程来执行任务。
  • 运行的线程数量 = maximumPoolSize 且任务队列已满时,如果有新的任务被提交,此时队列也无法接收新任务,在这种情况下,该任务将被拒绝。

排队一般有三种策略:

  1. 直接交给线程处理。 工作队列的一个很好的默认选择是 SynchronousQueue,它是一个同步队列,只能容纳一个元素,每个插入操作都必须等待另一个线程的取出操作。它将任务直接交给线程而不用其他方式保留它们。

    如果没有立即可用的线程来运行任务,则尝试将任务排队将失败,因此将构造一个新线程。 这种策略通常需要无限的 maximumPoolSizes 以避免拒绝新提交的任务。

  2. 无界队列。当所有 corePoolSize 线程都在工作时,使用无界队列(例如没有预定义容量的 LinkedBlockingQueue)将导致新任务在队列中排队等待。

    因为 LinkedBlockingQueue 没有设置容量限制,可以无限排队等待,工作的线程数量也就不会超过 corePoolSize ,因为超出的任务都去排队了。此时,maximumPoolSize 就没有作用了。

    无界队列适用于当每个任务完全独立于其他任务的情况。例如,在网页服务器管理请求。

    无界队列的弊端也是队列没有容量限制,可以无限扩张,无法灵活的控制工作效率。

  3. 有界队列。有界队列(例如 ArrayBlockingQueue)在与有限的 maximumPoolSizes 一起使用时有助于防止资源耗尽,但可能更难以调整和控制。

    队列大小和 maximumPoolSizes 可以相互权衡:使用大队列和小池可以最大限度地减少 CPU 使用率、操作系统资源和上下文切换开销,但可能会导致人为地降低吞吐量。如果任务经常阻塞(例如,如果它们受 I/O 限制),系统可能能够为比您允许的更多线程安排时间。使用小队列通常需要更大的线程池大小,这会使 CPU 更忙绿,但可能会遇到不可接受的调度开销,这也会降低吞吐量。

拒绝策略

当 Executor 关闭时或 Executor 对 maximumPoolSizes 和工作队列容量都设置了有限的数量,并且已经队列和线程都已经饱和时,在方法 execute(Runnable) 中提交的新任务将会被拒绝。 在任何一种情况下,execute 方法都会调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)方法。 提供了四个预定义的处理策略:

  1. 在默认的 ThreadPoolExecutor.AbortPolicy 中,在拒接任务时,handler 抛出一个运行时异常 RejectedExecutionException 。
  2. 在 ThreadPoolExecutor.CallerRunsPolicy 中,调用 execute 本身的线程运行任务。 这提供了一种简单的反馈控制机制,可以减慢提交新任务的速度。
  3. 在 ThreadPoolExecutor.DiscardPolicy 中,直接丢弃无法执行的任务。
  4. 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果 executor 没有关闭,则丢弃工作队列头部的任务,然后重试执行(可能再次失败,导致重复此操作。)

可以定义和使用其他类型的 RejectedExecutionHandler 类。 这样做需要小心谨慎,尤其是当策略设计为仅在特定容量或排队策略下工作时。

执行前和执行后的 Hook

ThreadPoolExcutor 提供了可覆盖的 beforeExecute(Thread, Runnable)afterExecute(Runnable, Throwable) 方法,这俩方法会在执行每个任务之前和之后调用。 可用于操纵执行环境; 例如,重新初始化 ThreadLocals、收集统计信息或添加日志条目。 如果钩子或回调方法抛出异常,内部工作线程可能会依次失败并突然终止。

队列的维护

方法 getQueue() 允许访问工作队列以进行监视和调试。 强烈建议不要将此方法用于任何其他目的。 当大量排队的任务被取消时,提供的两个方法 remove(Runnable) 和 purge 可用于协助存储回收。

Finalization

程序中不再引用线程池,且没有工作线程时将自动关闭。 如果您想确保即使用户忘记调用 shutdown 也能回收未引用的线程池,那么您必须通过设置适当的 keepAliveTime、使用零核心线程的下限和/或设置 allowCoreThreadTimeOut 来安排未使用的线程最终死亡 。

用法

ThreadPoolExcutor 一般通过拓展实现来自定义一个线程池,以下是一个简单的例子,它实现了一个可以暂停/恢复的线程池:

    class PausableThreadPoolExecutor extends ThreadPoolExecutor {
        private boolean isPaused;
        private ReentrantLock pauseLock = new ReentrantLock();
        private Condition unpaused = pauseLock.newCondition();
    
        public PausableThreadPoolExecutor(...) { super(...); }
    
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            pauseLock.lock();
            try {
                while (isPaused) unpaused.await();
            } catch (InterruptedException ie) {
                t.interrupt();
            } finally {
                pauseLock.unlock();
            }
        }
    
        public void pause() {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
        }
    
        public void resume() {
            pauseLock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                pauseLock.unlock();
            }
        }
    }

源码阅读

ThreadPoolExcutor 类是 java.util.concurrent 包下 Executor 的唯一实现(虽然中间有两层继承关系),是用来定义线程池的类,它在 J.U.C. 中唯一实现是 ScheduledThreadPoolExecutor 。

继承关系

Java 多线程并发【16】ThreadPoolExcutor
image-20220725110908352.png

ThreadPoolExcutor 是 Executor 的一个实现,它的内部有一些策略相关的内部类,和一个 Worker 内部类。

关键属性

这里大致留意一下即可,有些属性涉及到具体的逻辑时才需要详细了解。

    // 表示线程池状态的原子整数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 工作队列
    private final BlockingQueue<Runnable> workQueue;
    // 主线程池锁
    private final ReentrantLock mainLock = new ReentrantLock();
    // 工作线程集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // 配合 ReentrantLock 的条件 Condition 
    private final Condition termination = mainLock.newCondition();
    // 跟踪获得的最大池大小。只能在 mainLock 下访问。
    private int largestPoolSize;
    // 已完成任务树立
    private long completedTaskCount;
    // 线程工厂类的实例
    private volatile ThreadFactory threadFactory;
    // 执行中饱和或关闭时调用的策略 
    private volatile RejectedExecutionHandler handler;
    // 等待工作的空闲线程超时时间(以纳秒为单位)。 当存在多于 corePoolSize 或 allowCoreThreadTimeOut 时,线程使用此超时。 否则,他们将永远等待新的工作。
    private volatile long keepAliveTime;
    // 是否允许工作线程超时
    private volatile boolean allowCoreThreadTimeOut;
    // corePoolSize 是保持活动的最小工作数(并且不允许超时等),除非设置了 allowCoreThreadTimeOut,在这种情况下最小值为零。
    private volatile int corePoolSize;
    // 最大池大小。 请注意,实际最大值在内部受 CAPACITY 限制。
    private volatile int maximumPoolSize;
    // 默认的拒绝执行策略 
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    // shutdown 和 shutdownNow 的调用者所需的权限。
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");
    // 执行完成时要使用的上下文,或 null。
    private final AccessControlContext acc;

线程池状态

与线程池状态相关的属性包括:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

其中,ctl 用来表示线程池的控制状态,它一个包含两个含义的原子整数:

  • workerCount,表示有效线程数
  • runState,指示是否正在运行、正在关闭等

为了将它们打包到一个 int 中,我们将 workerCount 限制为 (2^29)-1(约 5 亿)个线程,而不是 (2^31)-1(20 亿)个线程。如果这在未来成为问题,可以将变量更改为 AtomicLong,并调整下面的移位/掩码常量。但是在当前情况下,这段代码使用 int 会更快更简单一些。

workerCount 是允许启动和不允许停止的工作线程的数量。该值可能与实际的活动线程数暂时不同,例如,当 ThreadFactory 在未能创建线程时访问该字段,以及退出线程在终止前仍在执行时访问该字段。返回当前用户可见的线程数量作为工作线程数量的大小。

runState 用来表示生命周期,它包括以下值:

  • RUNNING:接受新任务并处理排队的任务
  • SHUTDOWN:不接受新任务,但处理排队的任务
  • STOP:不接受新任务,不处理排队任务,中断正在进行的任务
  • TIDYING:所有任务都已终止,workerCount 为零,转换到状态 TIDYING 的线程将运行 terminate() 的 hook 方法
  • TERMINATED:terminated() 已完成

这些值之间的数字顺序很重要,以便进行顺序的比较。runState 随着时间的推移单调增加,但不需要达到每个状态。

过渡顺序是:

  • RUNNING -> SHUTDOWN ,在调用 shutdown() 时,可能隐含在 finalize() 中。
  • (RUNNING or SHUTDOWN) -> STOP ,调用shutdownNow() 。
  • SHUTDOWN -> TIDYING ,当队列和线程池都为空时。
  • STOP -> TIDYING ,当线程池为空时。
  • TIDYING -> TERMINATED ,当 terminated() 当 hook 方法都完成时。

当状态达到 TERMINATED 时,在 awaitTermination() 中等待的线程将返回。

检测从 SHUTDOWN 到 TIDYING 的转换并不像您想要的那么简单,因为队列可能在非空后变为空,在 SHUTDOWN 状态下反之亦然,但我们只能在看到它为空之后终止,我们看到 workerCount 为 0(有时需要重新检查)。

内部类

ThreadPoolExcutor 的核心内部类是 Worker 。

Worker

Worker 类主要维护线程运行任务的中断控制状态,以及其他次要的记录。 此类巧妙地扩展了 AbstractQueuedSynchronizer ,以简化获取和释放围绕每个任务执行的锁。 这可以防止旨在唤醒等待任务的工作线程而不是中断一个任务来让另一个任务运行。

Worker 继承自 AQS 并实现了 Runnable 接口:

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        final Thread thread;

        Runnable firstTask;
        // 任务计数器
        volatile long completedTasks;

        // 使用 ThreadFactory 创建线程。
        Worker(Runnable firstTask) {
            setState(-1); // 禁止中断直到 runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }

        // Lock methods
        // ...
    }

从核心逻辑来看,Worker 就是工作线程的抽象。它的执行任务的逻辑在外部累 ThreadPoolExecutor 中的 runWorker 方法中,这也是线程池的核心运行逻辑:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread(); // 当前线程
    Runnable task = w.firstTask; // Worker 中第一个任务
    w.firstTask = null
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true
    try {
        // 不断去轮询任务
        while (task != null || (task = getTask()) != null) {
            w.lock(); 
            // 如果线程池停止,确保线程中断,否则, 确保线程没有中断。
            // 这需要在第二种情况下重新检查以在清除中断时处理 shutdownNow 竞争
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // hook 方法
                Throwable thrown = null
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // hook 方法
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

工作线程的循环。 反复从队列中获取任务并执行它们,同时处理一些问题:

  1. 可能从一个初始任务开始,在这种情况下我们不需要获得第一个任务。 否则,只要 pool 正在运行,我们就从 getTask 获取任务。 如果它返回 null ,则 Worker 会因池状态或配置参数的更改而退出。 其他退出是由外部代码中的异常抛出导致的,在这种情况下,completedAbruptly 为 true,这通常会导致 processWorkerExit 替换该线程。
  2. 在运行任何任务之前,获取锁以防止在任务执行过程中发生其他线程池中断,然后我们确保除非线程池停止,否则该线程没有设置其中断。
  3. 每个任务运行之前都会调用 beforeExecute,这可能会引发异常,在这种情况下,我们会导致线程死掉(用 completedAbruptly = true 中断循环)而不处理任务。
  4. 假设 beforeExecute 正常完成,我们运行任务,收集它抛出的任何异常以发送到 afterExecute。 我们分别处理 RuntimeException、Error(规范保证我们捕获)和任意 Throwables。 因为我们不能在 Runnable.run 中重新抛出 Throwables,所以我们在退出时将它们包装在 Errors 中(到线程的 UncaughtExceptionHandler)。 任何抛出的异常也会保守地导致线程死亡。
  5. task.run完成后,我们调用afterExecute,同样可能会抛出异常,也会导致线程死掉。 根据 JLS Sec 14.20,即使 task.run 抛出,此异常也会生效。

异常机制的最终效果是 afterExecute 和线程的 UncaughtExceptionHandler 具有我们可以提供的关于用户代码遇到的任何问题的准确信息。

策略相关的内部类

除了 Worker ,ThreadPoolExecutor 的内部类还有:

  • CallerRunsPolicy,被拒绝任务的处理程序,直接在执行方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下,任务将被丢弃。
  • AbortPolicy,抛出 RejectedExecutionException 的被拒绝任务的处理程序。
  • DiscardPolicy,被拒绝任务的处理程序,它默默地丢弃被拒绝的任务。
  • DiscardOldestPolicy,拒绝任务的处理程序,丢弃最旧的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。

关键方法

任务的执行 execute(Runnable)

在未来的某个时间执行给定的任务。 该任务可以在新线程或现有池线程中执行。 如果任务无法提交执行,要么是因为这个执行器已经关闭,要么是因为它的容量已经达到,任务由当前的 RejectedExecutionHandler 处理。

public void execute(Runnable command) {
    if (command == nullthrow new NullPointerException();
    int c = ctl.get(); // 当前线程池状态
    if (workerCountOf(c) < corePoolSize) { // 工作线程 < corePoolSize
        if (addWorker(command, true)) // 直接通过 addWorker 创建线程
            return;
        c = ctl.get(); // 其他情况更新状态
    }
    // 线程池运行中,且将任务成功添加到 workQueue 队尾
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get(); // 重新检查线程池状态
        if (! isRunning(recheck) && remove(command))
            reject(command); // 异常情况拒绝任务
        else if (workerCountOf(recheck) == 0)
            addWorker(nullfalse); // 工作线程为 0 的情况
    }
    else if (!addWorker(command, false)) // 创建新的工作线程失败
        reject(command); // 拒绝任务
}

这个方法执行了三个步骤:

  1. 如果运行的线程少于 corePoolSize,尝试使用给定任务启动一个新线程作为其第一个任务。 对 addWorker 的调用以原子方式检查 runState 和 workerCount,因此通过返回 false 来防止在不应该添加线程时添加线程的错误警报。
  2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为现有的线程自上次检查后死亡)或者池在进入此方法后关闭。 因此,我们重新检查状态,如果有必要,如果停止排队,则回滚,或者如果没有,则启动一个新线程。
  3. 如果我们不能排队任务,那么我们尝试添加一个新线程。 如果它失败了,我们知道线程池已经关闭或饱和,因此拒绝任务。

创建新线程 addWorker(Runnable, boolean)

execute 方法中通过 addWorker 方法来创建新的工作线程:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 线程池状态检查
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;
        // 根据当前线程数量检查是否需要创建线程
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry; // 跳出死循环
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 满足创建新的工作线程的条件
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 在创建新线程前首先要获取全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); // 全局锁住
            try {
                // 重新检查锁状态
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

检查是否可以根据当前线程池状态和给定边界(核心或最大值)添加新工作线程。 如果是这样,则相应地调整工作线程计数,并且如果可能,创建并启动一个新工作线程,将 firstTask 作为其第一个任务运行。 如果池已停止或有资格关闭,则此方法返回 false。 如果线程工厂在询问时未能创建线程,它也会返回 false。 如果线程创建失败,要么是由于线程工厂返回 null,要么是由于异常(通常是 Thread.start() 中的 OutOfMemoryError),直接回滚。

工作线程运行的核心逻辑 runWorker(Worker)

runWorker 在介绍 Worker 类时就已经进行介绍了,这里重提一下说明这个方法的重要性。

任务获取方法 getTask(): Runnable

在 runWorker 方法中,通过 getTask 去获取新任务:

while (task != null || (task = getTask()) != null) { ... }

深入 getTask 方法看看任务是如何获取的:

private Runnable getTask() {
    boolean timedOut = false// Did the last poll() time out?
    
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

根据当前的配置设置执行阻塞或定时的等待任务,或者如果此 Worker 由于以下任何原因必须退出,则返回 null:

  1. 有超过 maximumPoolSize 的 worker(由于动态调用了setMaximumPoolSize )。
  2. 线程池停止。
  3. 线程池关闭,队列为空。
  4. 这个 worker 超时等待一个任务,超时 worker 在定时等待前后都会被终止(即 allowCoreThreadTimeOut || workerCount > corePoolSize),如果队列非空,这个 worker 不是池中的最后一个线程。

allowCoreThreadTimeOut 为 false,线程即使空闲也不会被销毁;倘若为ture,在keepAliveTime内仍空闲则会被销毁。

线程池任务执行流程总结

回顾一下线程池任务的执行顺序:

 ThreadPoolExecutor#execute(Runnable) 
 |
 ThreadPoolExecutor#addWorker(Runnable, true// 核心线程 or
 workQueue.offer(command) // 添加到阻塞队列
 ThreadPoolExecutor#addWorker(Runnable, false// 非核心线程
 |
 Runnable#run() // runWorker 方法中

任务的提交 submit(…)

这个方法在 ThreadPoolExecutor 的祖父接口 ExecutorService 中定义,实现在 AbstractExecutorService 中:

    public Future<?> submit(Runnable task) {
        if (task == nullthrow new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == nullthrow new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == nullthrow new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

可以看出,这一组方法都是执行三个步骤:

  1. 检查 task == null,为空抛出 NPE
  2. 通过 newTaskFor 方法构造一个 RunnableFuture 对象
  3. execute 方法执行 RunnableFuture 对象

最后 execute 方法中,调用 RunnableFuture.run() 方法执行任务。

关闭线程池 shotdown 和 shutdownNow

这一组方法的声明在 ExecutorService 中声明,在 ThreadPoolExecutor 中实现:

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN); // 切换状态
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

shutdown 启动有序的关闭,执行先前提交的任务,但不会接受新任务。 如果已经关闭,调用无效。 此方法不等待先前提交的任务完成执行。 使用 awaitTermination 来做到这一点。

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP); // 切换状态
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

shutdownNow 尝试停止所有正在执行的任务,停止等待任务,并返回等待执行的任务列表。 这些任务在从该方法返回时从任务队列中排出(删除)。 此方法不等待主动执行的任务终止。 使用 awaitTermination 来做到这一点。 除了尽最大努力停止处理正在执行的任务之外,没有任何保证。 此实现通过 Thread.interrupt 取消任务,因此任何未能响应中断的任务可能永远不会终止。

无论是 shutdown 还是 shutdownNow ,核心的逻辑都是 interruptIdleWorkers()tryTerminate()

interruptIdleWorkers :

    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread; 
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt(); // 通过 Thread 中断取消执行任务
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return// 处理不合法状态
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE); // 存在工作中的线程,调用 interruptIdleWorkers 去关闭
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 切换至 TIDYING 状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated(); // 执行终止方法
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

在这个方法在最终执行到了 terminated 方法:

   protected void terminated() { }

Executor 终止时调用的方法。 默认实现什么都不做,这是一个供子类去实现的方法。 注意:要正确嵌套多个覆盖,子类通常应在此方法中调用 super.terminated。

回过头来,两者的区别是:

  1. shutdown 直接将线程池状态切换至 SHUTDOWN ;而 shutdownNow 切换至 STOP 。
  2. shutdownNow 切换状态为 STOP 后,取出等待队列中的任务返回。
  3. shutdown 提供了一个 hook 方法 onShutdown() 供 ScheduledThreadPoolExecutor 实现。

Executors 工具类

Executors 类中提供了 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, 和 Callable 的工厂和工具方法。支持以下几种方法:

  • 创建和返回设置了常用的配置的 ExecutorService 的方法。
  • 创建和返回具有常用配置的 ScheduledExecutorService 的方法。
  • 创建和返回“封装”的 ExecutorService 的方法,通过使特定于实现的方法无法访问来禁用重新配置。
  • 创建和返回将新创建的线程设置为已知状态的 ThreadFactory 的方法。
  • 从其他类似闭包的形式中创建和返回 Callable 的方法,因此它们可以在需要 Callable 的执行方法中使用。

最常用于快速实现线程池的方法是下面三个:

  • newFixedThreadPool,队列是 LinkedBlockingQueue,不拒绝新任务,maxSize = Integer.MAX_VALUE。
  • newSingleThreadExecutor,队列是 LinkedBlockingQueue,同步队列,只有一个线程,不拒绝新任务,maxSize = Integer.MAX_VALUE。
  • newCachedThreadPool,队列是 SynchronousQueue,线程数量无限制,当线程的空闲时间超过keepAliveTime,会自动释放线程资源。当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。

虽然官方是推荐使用上面的三种方法来快速启动线程池,但是在实际的开发中,开发者最好去根据实际的使用场景自定义实现线程池。

因为上面三个方法实现的线程池,可能并不贴合实际的使用场景,从而造成很多的任务排队、很多线程创建甚至导致 OOM 的情况出现。

总结

  • 原理:实现 Executor 接口,通过配置核心线程数、队列类型、最大线程数、存活时间、拒绝策略来实现不同效果的线程池。
  • 优点:一方面是可以统一管理线程资源,减少频繁创建线程的资源开销。另一方面可以配合不同的配置实现贴合实际使用场景,达到效率最大化。
  • 缺点:使用不当也会造成资源浪费。
  • 技术点:包括 BlockingQueue、Executor、AQS 、ReentLock 、Thread 、Runnable 、Future 等。


原文始发于微信公众号(八千里路山与海):Java 多线程并发【16】ThreadPoolExcutor

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/85080.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!