Java 多线程并发【17】ScheduleThreadPoolExecutor

ScheduleThreadPoolExecutor 是一个可以在给定延期时间后执行任务或者定期执行任务的 ThreadPoolExecutor 。这个类在需要多个工作线程或连续执行的场景下,比 Timer 更加合适。

  • 相比 Timer 使用单个线程,ScheduleThreadPoolExecutor 是一个线程池来执行任务,所以 Timer 所运行的线程可能会导致业务的阻塞,而线程池在某个线程阻塞或抛出异常时,不会影响其他线程执行任务,并且提供了更好的线程复用能力。

  • 另一个区别是,Timer 调度是基于操作系统的绝对时间进行的,而 ScheduleThreadPoolExecutor 是一个相对的时间,当修改系统的时间后,Timer 可能因为时间变化出现问题,而 ScheduleThreadPoolExecutor 不会。

  • ScheduleThreadPoolExecutor 中执行的任务会包装成 ScheduledFutureTask ,具有返回值(Future 定义的能力)。Timer 可执行类是 TimerTask ,TimerTask 实现了 Runnable ,没有 Future 的能力。

ScheduleThreadPoolExecutor 可以通过schedule 方法延期执行一个任务,使用 scheduleAtFixedRate 或 scheduleWithFixedDelay 方法进行执行周期的任务。

ScheduleThreadPoolExecutor 这个类专门实现了 ThreadPoolExecutor,但有一些自己的特色:

  1. 使用自定义的任务类型,ScheduledFutureTask ,ExecutorService 通过 execute 提交的任务即使不是 ScheduledFutureTask 也会被视为 ScheduledFutureTask 。

  2. 使用一个自定义队列,DelayedWorkQueue。它是无界 DelayQueue 的变体。与 ThreadPoolExecutor 相比,缺乏容量约束,以及 corePoolSize 和maximumPoolSize 实际上是相同的,以此来简化了一些执行机制(参见 delayedExecute ) 。

  3. 支持可选的 shutdown 后运行的参数,这导致重写 shutdown 方法来删除和取消 shutdown 后不应该运行的任务,以及当任务(重新)提交与 shutdown 重叠时不同的重新检查逻辑。

  4. 任务的 decoration 方法来允许拦截和插装一些逻辑,这是必需的,因为子类不能通过重写 submit 方法来获得这种效果。这些方法对线程池控制逻辑没有任何影响。

        // 供子类实现的装饰方法
      protected <V> RunnableScheduledFuture<V> decorateTask(
            Runnable runnable, RunnableScheduledFuture<V> task)
     
    {
            return task;
        }

        protected <V> RunnableScheduledFuture<V> decorateTask(
            Callable<V> callable, RunnableScheduledFuture<V> task)
     
    {
            return task;
        }

使用

因为 ScheduleThreadPoolExecutor 继承自 ScheduledExecutorService ,Executors 工具类中提供了快速创建 ScheduledExecutorService 对象的方法:

val threadPoolExecutor: ScheduledExecutorService = Executors.newScheduledThreadPool(2)

另一种方式是直接创建 ScheduleThreadPoolExecutor 对象:

val threadPoolExecutor: ScheduledThreadPoolExecutor = ScheduledThreadPoolExecutor(2)

两种方式效果是一样的,都会创建一个具有 2 个核心线程的线程池,通过执行不同的方法,可以执行一个延时执行的任务或是周期性地执行任务。

3 秒后执行一个任务:

        threadPoolExecutor?.schedule({
            dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
        },1, TimeUnit.SECONDS)

执行一个 1s 执行一次的周期任务:

    threadPoolExecutor?.scheduleAtFixedRate({
            dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
        },11, TimeUnit.SECONDS)

在 3 秒后启动第一个任务,然后以 1 秒为周期执行任务:

        threadPoolExecutor?.scheduleWithFixedDelay({
            dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
        },31, TimeUnit.SECONDS)

这里有一个困惑点,scheduleAtFixedRate 和 scheduleWithFixedDelay 效果好像一样的,但实际运行和查阅代码发现,这两个方法参数是不同的,并且效果也不同:

scheduleWithFixedDelay 方法的第三个参数是上一个任务结束到下一个任务开始的时间间隔;scheduleAtFixedRate 的第三个参数代表的是两个任务开始的时间间隔。

这可能不太好理解,经过实际的代码跑起来发现,scheduleAtFixedRate 的任务执行时间间隔基本上就是指定的时间间隔,例如 1s 执行一次,时间间隔基本上精准到 1000 ~ 1001 毫秒左右;而 scheduleWithFixedDelay 跑起来的效果则不是精确的每次都是 1s 间隔。

因为 scheduleAtFixedRate 只是关注每个任务启动的时间,即使上一个任务还没结束,也会在周期时间后立即启动下一个任务。

scheduleWithFixedDelay 则会等上一个任务结束后,再间隔指定的周期时间后,才启动下一个任务。

这里附上一个 Demo 代码,在 Android 中实现的:

class ScheduledThreadPoolActivity : AppCompatActivity() {

    private lateinit var recyclerView: RecyclerView
    private var threadPoolExecutor: ScheduledThreadPoolExecutor? = null

    private var dataSource = ArrayList<String>()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_scheduled_thread_pool)
        recyclerView = findViewById(R.id.recyclerView)
        recyclerView.apply {
            adapter = Adapter()
            layoutManager = LinearLayoutManager(this@ScheduledThreadPoolActivity)
        }
        findViewById<Button>(R.id.addBtn).setOnClickListener {
            startThreadPool()
        }
        findViewById<Button>(R.id.removeBtn).setOnClickListener {
            stopThreadPool()
        }
        findViewById<Button>(R.id.clearScreenBtn).setOnClickListener {
            dataSource.clear()
            recyclerView.adapter?.notifyDataSetChanged()
        }
    }

    private val mainHandler = Handler(Looper.getMainLooper())

    private fun startThreadPool() {
//        threadPoolExecutor = Executors.newScheduledThreadPool(2)
        threadPoolExecutor = ScheduledThreadPoolExecutor(2)
        threadPoolExecutor?.scheduleWithFixedDelay({
            dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
            mainHandler.post {
                recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
            }
        },31, TimeUnit.SECONDS)

//        threadPoolExecutor?.scheduleAtFixedRate({
//            dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
//            mainHandler.post {
//                recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
//            }
//        },3, 1, TimeUnit.SECONDS)
//
//
//        threadPoolExecutor?.schedule({
//            dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
//            mainHandler.post {
//                recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
//            }
//        },1, TimeUnit.SECONDS)
    }

    private fun stopThreadPool() {
        dataSource.add("threadPoolExecutor Close, time - ${System.currentTimeMillis()}")
        recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
        threadPoolExecutor?.shutdown()
        var isClosed = false
        // 等待线程池终止
        do {
            isClosed = threadPoolExecutor?.awaitTermination(1, TimeUnit.DAYS) ?: false
            dataSource.add("正在等待线程池中的任务执行完成")
            recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
        } while(!isClosed)
        dataSource.add("所有线程执行结束,线程池关闭")
        recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
    }

    inner class ViewHolder(val view: TextView) : RecyclerView.ViewHolder(view) {
        fun setText(text: String) {
            view.text = text
        }
    }

    inner class AdapterRecyclerView.Adapter<ViewHolder>() {
        override fun onCreateViewHolder(parent: ViewGroup, viewType: Int): ViewHolder {
            val itemView = TextView(parent.context).apply {
                layoutParams = ViewGroup.LayoutParams(ViewGroup.LayoutParams.MATCH_PARENT, ViewGroup.LayoutParams.WRAP_CONTENT)
                isSingleLine = false
                gravity = Gravity.START
                setPadding(10101010)
            }
            return ViewHolder(itemView)
        }

        override fun onBindViewHolder(holder: ViewHolder, position: Int) {
            holder.setText(dataSource[position])
        }

        override fun getItemCount()Int {
            return dataSource.size
        }
    }
}

原理

继承关系

Java 多线程并发【17】ScheduleThreadPoolExecutor
image-20220727200201809.png

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor 和 ScheduledExecutorService 接口,前者是线程池类,后者则是定义任务调度能力的接口:

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit)
;

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

schedule 方法的作用是创建并执行在给定延迟后启用的一次性操作。

而其他两个方法在上文中提到过,用来周期执行任务,这里不再重复说明。

除了父类关系,ScheduledThreadPoolExecutor 还有两个内部类 ScheduledFutureTask 和 DelayedWorkQueue 。

内部类

ScheduledFutureTask

Java 多线程并发【17】ScheduleThreadPoolExecutor
image-20220727200300574.png

这是一个 FutureTask ,并实现了 RunnableScheduledFuture 接口:

public interface RunnableScheduledFuture<Vextends RunnableFuture<V>, ScheduledFuture<V{
    boolean isPeriodic();
}

RunnableScheduledFuture 定义了是否周期执行的能力,所以 ScheduledFutureTask 是一个可周期执行的 FutureTask 。并且在继承关系中,RunnableScheduledFuture 又继承自 Delayed ,具备延时时间的能力,Delayed 继承自 Comparable ,延时时间是可以进行比较的。在 ScheduledFutureTask 中实现了对延时时间进行比较的方法,可以对队列进行按时间的排序:

        public int compareTo(Delayed other) {
            if (other == this// compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

ScheduledFutureTask 中的 sequenceNumber 属性表示了先进先出的序号,通过原子类保证序号是同步的:

private static final AtomicLong sequencer = new AtomicLong();
// ScheduledFutureTask 的构造方法中调用
this.sequenceNumber = sequencer.getAndIncrement();
        // Creates a one-shot action with given nanoTime-based trigger time.
        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        // Creates a periodic action with given nano time and period.
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        // Creates a one-shot action with given nanoTime-based trigger time.
        ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

ScheduledFutureTask 是 FutureTask ,所以具备取消的能力:

        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            if (cancelled && removeOnCancel && heapIndex >= 0)
                remove(this);
            return cancelled;
        }

剩下的部分是 ScheduledFutureTask 执行的主要逻辑:

    private class ScheduledFutureTask<Vextends FutureTask<Vimplements RunnableScheduledFuture<V{
      // ...
        private long time;

        private final long period;

        /** The actual task to be re-enqueued by reExecutePeriodic */
        RunnableScheduledFuture<V> outerTask = this;

        // Index into delay queue, to support faster cancellation.
        int heapIndex;

        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }

        // Returns {@code true} if this is a periodic (not a one-shot) action.
        public boolean isPeriodic() {
            return period != 0;
        }

        // Sets the next time to run for a periodic task.
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }


        // Overrides FutureTask version so as to reset/requeue if periodic.
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
    }

DelayedWorkQueue

Java 多线程并发【17】ScheduleThreadPoolExecutor
image-20220727200352183.png

DelayedWorkQueue 是一种 BlockingQueue , 并且继承 Abstract Queue 实现了默认的 Queue 能力。

DelayedWorkQueue 是专门用于 ScheduleThreadPoolExecutor 的延迟队列。为了与ThreadPoolExecutor 的声明相匹配,这个类必须声明为BlockingQueue,尽管它只能保存 RunnableScheduledFutures。

    static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable
{


        private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;
       // ...
  }

DelayedWorkQueue 的数据结构是一个 RunnableScheduledFuture 数组。每个ScheduledFutureTask 都会记录它自己在 DelayedWorkQueue 数组中的索引,以便快速索引。

数组中的元素变化都会通过 siftUp 和 siftDown 中进行记录;删除时,ScheduledFutureTask 的 heapIndex 设置为 -1。注意,ScheduledFutureTask 在队列中最多只能出现一次(其他类型的任务或工作队列不需要这样),因此由heapIndex 作为唯一标识。


        private void setIndex(RunnableScheduledFuture<?> f, int idx) {
            if (f instanceof ScheduledFutureTask)
                ((ScheduledFutureTask)f).heapIndex = idx;
        }
        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }
        private void siftDown(int k, RunnableScheduledFuture<?> key) {
            int half = size >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                RunnableScheduledFuture<?> c = queue[child];
                int right = child + 1;
                if (right < size && c.compareTo(queue[right]) > 0)
                    c = queue[child = right];
                if (key.compareTo(c) <= 0)
                    break;
                queue[k] = c;
                setIndex(c, k);
                k = child;
            }
            queue[k] = key;
            setIndex(key, k);
        }

DelayedWorkQueue 的数组每次扩容 50% :

        /**
         * Resizes the heap array.  Call only when holding lock.
         */

        private void grow() {
            int oldCapacity = queue.length;
            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
            if (newCapacity < 0// overflow
                newCapacity = Integer.MAX_VALUE;
            queue = Arrays.copyOf(queue, newCapacity);
        }

DelayedWorkQueue 中的操作是安全的,操作前通过 ReentrantLock 加锁,操作完成后再解锁,例如:

        public int size() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return size;
            } finally {
                lock.unlock();
            }
        }

添加和删除等方法的源码逻辑可自行查看源码。

构造函数

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }


    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory)
 
{
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler)
 
{
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler)
 
{
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

ScheduledThreadPoolExecutor 的构造函数主要有三个参数:

  • corePoolSize
  • ThreadFactory
  • RejectedExecutionHandler

其中只有 ThreadFactory 是 ScheduledThreadPoolExecutor 独有的,用来支持可以自定义创建线程的工厂类。

另一个需要关注的点是,ScheduledThreadPoolExecutor 将 maximumPoolSize 设置为 Integer.MAX_VALUE ,keepAliveTime 设置为 0,workQueue 设置为 DelayedWorkQueue ,没有任何变种的余地。

核心属性

    // 如果应该在 shutdown 时cancel /suppress 周期性任务,则为False。
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;

    // 如果在 shutdown 时取消非周期性任务,则为False。
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    // 如果 ScheduledFutureTask.cancel 应该从队列中移除时为 true
    private volatile boolean removeOnCancel = false;

    // 序号可以打破调度关联,进而保证绑定条目之间的FIFO顺序。
    private static final AtomicLong sequencer = new AtomicLong();

ScheduledThreadPoolExecutor 的属性没有太多,都是用来做标志位的,只有 sequencer 是用来给每个 ScheduledFutureTask 排序的。

核心方法

从 Executor 继承来的 execute 方法和 AbstractExecutorService 继承而来的 submit 方法都是直接使用 schedule 执行:

    public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }
    
  public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }

    public <T> Future<T> submit(Runnable task, T result) {
        return schedule(Executors.callable(task, result), 0, NANOSECONDS);
    }

    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, NANOSECONDS);
    }

schedule

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit)
 
{
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

延时执行单个任务的方法,核心逻辑在 delayedExecute 方法中:

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

delayedExecute 是延迟或周期执行任务的主要执行方法。如果 pool 关闭,则拒绝任务。否则,将 task 添加到队列并启动一个线程(如果需要的话)来运行它。(我们不能提前启动线程来运行任务,因为任务(可能)还不应该运行。)如果在添加任务时关闭了线程池,根据线程池的状态和 run-after-shutdown 参数,来决定是否取消并删除它。

预启动线程的方法是 ensurePrestart() :

    // 与prestartCoreThread相同,不同的是,即使corePoolSize为0,至少有一个线程被启动。
  void ensurePrestart() {
        int wc = workerCountOf(ctl.get()); // 当前池中线程数量
        if (wc < corePoolSize) // 当前池中线程数小于核心线程数,初始化核心线程
            addWorker(nulltrue);
        else if (wc == 0)
            addWorker(nullfalse); // 在没有线程的时候启动一个线程
    }

新建的工作线程会保存到 workers ,供后续快速访问。创建新的 Worker 后,会调用 Worker 中的 Thread 的 start 方法,Thread 的参数 Runnable 是 Worker 自己,从而调用到 Worker 的 run 方法,工作线程开始工作。

addWorker 方法:

    private boolean addWorker(Runnable firstTask, boolean core) {
      // ...
      Worker w = null;
      try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            // ...
            if (workerAdded) {
        t.start();
                workerStarted = true;
            }
           // ...
    }

Worker 初始化时,创建线程并将自身作为 Runnable 放到线程中执行:

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

Worker 的 run 方法:

        public void run() {
            runWorker(this);
        }

scheduleAtFixedRate

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

scheduleAtFixedRate 逻辑和 schedule 方法类似,ScheduledFutureTask 多设置了一个 period 参数,因此可以周期运行。周期运行主要是在 ScheduledFutureTask 的 run 方法中的 setNextRunTime() 方法:

        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }

scheduleWithFixedDelay

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

scheduleWithFixedDelay 方法逻辑和 scheduleAtFixedRate 方法基本一致,唯一的不同点是 ScheduledFutureTask 的第三个参数  period,scheduleWithFixedDelay 方法设置的是 unit.toNanos(-delay)) , 这里做了一个取负值操作,为什么会造成不同的效果呢,还是要去 ScheduledFutureTask 的 setNextRunTime 方法中 :

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}

当 p 小于 0 时, time 取的是 triggerTime(-p) :

    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }
  • time:开始执行时间
  • period:周期任务时间间隔

也就是说 scheduleAtFixedRate 下一次任务是  time += p ,以第一个任务启动后,period 时间后启动第二个。

而 scheduleWithFixedDelay 每次取当前时间,再加上 period ,这也是两个方法的根本区别。

overflowFree 方法的作用是将延时时间限制在 Long.,MAX_VALUE 以内,避免溢出:

private long overflowFree(long delay) {
    Delayed head = (Delayed) super.getQueue().peek();
    if (head != null) {
        long headDelay = head.getDelay(NANOSECONDS);
        if (headDelay < 0 && (delay - headDelay < 0))
            delay = Long.MAX_VALUE + headDelay;
    }
    return delay;
}

shutdown 和 shutdownNow

public void shutdown() {
    super.shutdown();
}

public List<Runnable> shutdownNow() {
    return super.shutdownNow();
}

关闭线程池方法都是使用 ThreadPoolExecutor 的逻辑,这里不再展开介绍,唯一的区别是 ScheduleThreadPoolExecutor 实现了 onShutdown 方法:

@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    if (!keepDelayed && !keepPeriodic) {
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
    }
    else {
        // Traverse snapshot to avoid iterator exceptions
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate();
}

在 onShutdown 中,取消并清除由于关闭策略而不应该运行的所有任务的队列。

onShutdown 的调用时机:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

总结

  • ScheduleThreadPoolExecutor 是用来执行延时任务或周期执行任务的线程池。
  • scheduleAtFixedRate 和 scheduleWithFixedDelay 的区别是 scheduleAtFixedRate 是定时开启任务执行;scheduleWithFixedDelay 则是当上一个结束后,延迟一定时间后,再启动下一个任务。
  • ScheduleThreadPoolExecutor 中运行的任务都会被包装成 ScheduledFutureTask 。
  • ScheduleThreadPoolExecutor 设置了 Integer.MAX_VALUE 为 maximumPoolSize ,阻塞队列设置为 DelayedWorkQueue 。
  • DelayedWorkQueue 的底层数据结构是数组,每次扩容 0.5 容量。
  • and 其他的看文章吧。


原文始发于微信公众号(八千里路山与海):Java 多线程并发【17】ScheduleThreadPoolExecutor

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/85074.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!