关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习

线程池的介绍

1.线程池的重要性

2.什么是”池”,软件中的”池”,可以理解为计划经济

3.如果不使用线程池,每个任务都开一个线程处理,

① 一个线程

② for循环创建线程

③ 当任务数量上升到1000

这样开销太大,我们希望有固定数量的线程,来执行这1000个线程,这样就避免了反复创建并且销毁线程所带来的开销问题。

为什么要使用线程

问题一:反复创建线程开销大

问题二:过多的线程会占用太多的内存

解决以上两个问题的思路

① 用少量的线程——避免内存占用过多

② 让这部分线程都保持工作,且可以反复执行任务——避免生命周期的损耗

线程池的好处

① 加快响应速度

② 合理利用CPU和内存

③ 统一管理

线程池适合应用的场合

服务器接受到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率

实际上,在开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理

创建和停止线程池

线程池构造函数的参数

关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习

● corePoolSize:核心线程数,线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务

● maxPoolSize:最大线程数,线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程有一个上限,这就是最大量maximumPoolSize

添加线程规则

1.如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新的任务。

2.如果线程数等于(大于)corePoolSize但少于maximumPoolSize,则将任务放入队列。

3.如果队列已满,并且线程数小于maximumPoolSize,则创建一个新线程来运行任务。

4.如果队列已满,并且线程数大于或等于maxPoolSize,则拒绝该任务

线程池执行流程:

关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习

是否需要增加线程的判断顺序是:

① corePoolSize

② workQueue

③ maxPoolSize

例子:

线程池:核心池大小5,最大池大小为10,队列为100。

因为线程中的请求最多会创建5个,然后任务将被添加到队列中,直到到达100。当队列已满时,将创建最新的线程maxPoolSize,最多到十个线程,如果再来任务,就拒绝。

增减线程的特点

1.通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池

2.线程池希望保持较少的线程数,并且只有在负载变得很大的时才增加它。

3.通过设置maximmumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。

4.是只有在队列满时才创建多于corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。

线程池应该手动创建还是自动创建

手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽的风险。

自动创建线程池(也就是直接调用JDK封装好的构造函数)可能带来那些问题

1.newFixedThreadPool

由于传出去的LinkedBlockingQueue是没有容量上限的,所以当请求越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易占用大量的内存,可能会导致OOM

源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

正常实例:

/**
 * 描述: 演示newFixedThreadPool
 */

public class FixedThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
class Task implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

出现OOM实例:

① 内存设置小一点

关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习

②代码:

public class FixedThreadPoolOOM {
    private static ExecutorService executorService = Executors.newFixedThreadPool(1);

    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE ; i++) {
            executorService.execute(new SubThread());
        }
    }
}
class SubThread implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(1000000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2.newSingleThreadExecutor

可以看出,这里和上面的newFixedThreadPool的原理基本一样,只不过把线程数直接设置成了1,所以这也会导致一样的问题,也就是当请求堆积的时候,可能会占用大量的内存。

① 单线程的线程池:它只会用工作的唯一线程来执行任务

② 它的原理和FixedThreadPool是一样的,但是此时的线程线程数量被设置为了1

源码:

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(11,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

实例:

public class SingleThreadExecutor {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 1000 ; i++) {
            executorService.execute(new Task());
        }
    }
}

3.newCachedThreadPool

① 可缓存线程池

② 特点:无界线程池,具有自动回收多余线程的功能。

这里的弊端在于第二个参数maximumPoolSize被设置为了Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM。

源码:

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

实例:

public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}

4.newScheduledThreadPool

支持定时及周期性任务执行的线程池

源码:

   public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
  public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

实例:

public class ScheduledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
//        threadPool.schedule(new Task(),5, TimeUnit.SECONDS);
        threadPool.scheduleAtFixedRate(new Task(), 13, TimeUnit.SECONDS);
    }
}

5.以上4种线程池的构造函数的参数

关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习

阻塞队列分析

① FixedThreadPool和SingleThreadExecutor的Queue是LinkedBlockingQueue

假设FixedThreadPool固定有10个线程,由于线程数量不能往上膨胀,不得不用一个能够存储很多的或无限多的队列帮助我们去存储任务,新来的任务数量没办法估计,只能在自身上把阻塞队列的容量设置为无限,这就是选择LinedBlockingQueue的原因。

② CachedThreadPool使用的是Queue是SynchronousQueue

SynchronousQueue內部不存储任务,在CachedThreadPool这种线程池情况下不需要队列存储任务,当有任务过来直接就交给新的线程去执行,新的线程数量是不受限制的,效率比较高,不需要在队列中进行中转。

③ ScheduledThreadPool来说,它使用的是延迟队列DelayedWorkQueue

延迟队列的能力就是把里面的任务根据时间先后做延迟,符合它的使用场景。

WorkStealingPool线程池

WorkStealingPool是JDK1.8加入的

① 这个线程池和之前的的都有很大不同

② 子任务

③ 窃取

正确创建线程池的方法

根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,我们想给线程取什么名字等等

线程池里的数量设定为多少较合适

① CPU密集型(加密、计算hash等):最佳核心线程数为CPU核心数的1-2倍。

② 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于核心线程数很多倍,保证线程空闲可以衔接上。

③ 参考Brain Goetz推荐的计算方法:

线程数=CPU核心数*(1+平均等待时间/平均工作时间)

停止线程池的正确方法

1.shutdown

初始化整个关闭过程,线程,池在执行过程中有线程中有正在执行的任务,还包括队列中有等待被执行的任务,运行该方法后,线程池知道我们想让它停止,优雅起见,会把正在执行的任务以及队列中等待执行的任务都执行完毕后,它再关闭,当执行该方法后再有新的任务提交,会抛出拒绝的异常

2.isShutdown

可以返回一个true或false,告诉我们这个线程池是不是已经停止了

3.isTerminated

线程池是不是完全停止,包括正在执行的线程以及队列里面任务都清空了

4.awaitTermination

检测等待一段时间内线程是不是完全停止的方法

5.shutdownNow

尝试停止所有主动执行的任务,停止等待任务的处理,并返回正在等待执行的任务列表。

关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习

实例:

public class ShutDown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        List<Runnable> runnableList = executorService.shutdownNow();
//        boolean b = executorService.awaitTermination(3L, TimeUnit.SECONDS);
//        System.out.println(b);
//        System.out.println(executorService.isShutdown());
//        executorService.shutdown();
//        System.out.println(executorService.isShutdown());
//        System.out.println(executorService.isTerminated());
    }
}

class ShutDownTask implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName()+"被中断了");
        }
    }
}

任务太多,怎么拒绝

1.拒绝时机

① 当Excutor关闭时,提交新任务会被拒绝。

② 当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时

2.四种拒绝策略

① AbortPolicy:直接抛出RejectedExecutionException异常信息,线程池默认的拒绝策略

② DiscardPolicy:默默的丢弃任务,不会得到通知,不知道这个任务,不会得到处理

③ DiscardOldestPolicy:丢弃最老的存储时间最久的任务丢掉,以便腾出空间存放你刚刚提交最新的任务

④ CallerRunsPolicy:谁提交的任务谁去执行,即体现调用线程处理(优点一:让主线程去运行避免业务损失;优点二:让提交的速度降低,主线程提交任务满了后发现是这种拒绝策略,本身作为提交者运行,运行该任务花一定的时间,只有执行该任务完以后才能提交下一个任务,这段时间线程池也会执行完毕一些任务,为后面的任务腾出一定的空闲,给线程池一个缓冲的时间)

可以根据业务需求选择拒绝策略

钩子方法,给线程池加料

1.每个任务执行前后

2.日志、统计

3.代码演示:

/**
 * 演示每个任务执行前后放钩子方法
 */

public class PauseableThreadPool extends ThreadPoolExecutor {

    private static final ReentrantLock lock = new ReentrantLock();

    private Condition unpaused = lock.newCondition();
    private boolean isPaused;

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    public void resume() {
        lock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(
                102010l, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>());
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被执行");

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }
        Thread.sleep(1500);
        pauseableThreadPool.pause();
        System.out.println("线程池暂定了");
        Thread.sleep(1500);
        pauseableThreadPool.resume();
        System.out.println("线程被恢复了");
    }
}

实现原理、源码分析

线程池组成部分

① 线程池管理器

② 工作线程

③ 任务队列

④ 任务接口(Task)

Executor家族

线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors等这么多和线程池相关的类,大家都是什么关系

关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习

工具类Executors

关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习

线程池实现任务复用的原理(相同的线程执行不同的任务)

源码:

execute方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(nullfalse);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

如果工作线程小于核心线程进入addWorker方法

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

线程复用主要在Worker类中RunWorker的方法

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//直接拿到任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();//执行完一个任务再去阻塞队列取一个新的任务
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//执行Runnable的run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

线程池的状态

1.RUNNING:接受新任务并处理排队任务

2.SHUTDOWN:不接受新任务,但处理排队任务

3.STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务

4.TIDYING:中文整洁的意思,理解中文容易理解这个状态了:所有任务都已经终止,workerCount为零时,线程会转换到TIDYING状态,并将运行terminate()钩子方法

5.TERMINATED:terminate()运行完成

关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习

使用线程池的注意点

1.避免任务的堆积(导致内存不够)

2.避免线程数过度增加

3.排查线程泄漏(排查线程数量是否超过正常数量,线程已经执行完毕,却不能被回收,可能逻辑有问题导致任务结束不了)


原文始发于微信公众号(itmkyuan):关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/39217.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!