大家好, 这里是 K 字的研究.
今天老 K 来研究下线程池执行器
(ThreadPoolExecutor)的参数作用, 不进行深入的源码分析.
必正乎名也
子曾经曰过:”必也正名乎”. 所以现在, 要强调一下, 这个东西不叫 Java 线程池. 你可以叫他Executor
, 也可以叫他ThreadPoolExecutor
, 但是就是不能叫他线程池
.他是用了线程池实现的Executor
,而不是用Executor
实现的ThreadPool
.
Executors
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
因为ThreadPoolExecutor
的参数比较多:
-
corePoolSize -
maximumPoolSize, -
keepAliveTime, -
unit, -
workQueue, -
threadFactory, -
handler);
所以, 官方做了一个简化使用的工厂类: Executors
.是的, 我们有一个设计模式.
虽然大多数时候不被推荐用, 不过我们是还是先从这里来开始.
Executors.newFixedThreadPool(int)
这个方法, 顾名思义, 产生一个线程池尺寸固定的 Executor. 实现是这样的:
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
corePoolSize==maximumPoolSize
, 这就是固定尺寸的意思了. 大小不会变化.
我们声明一个这样的线程数为 3 的线程池执行器, 提交 39 个任务, 看一下他的运行过程.
简介一下代码结构. 需要知道它里面到底是怎么样, 才好说这图画的是什么.
下面是线程池的声明.
private final HashSet<Worker> workers = new HashSet<Worker>();
其实他就是个HashSet
, 里面是Worker
类的对象.Worker
里面是线程,还有当前干了多少活.简单的很.
今天我们不研究源码, 所以, 就到这里了. 继续来说图.
上面绿色的部分, 是待执行的任务存储的FutureTask
队列. 是不是有点排队去领鸡蛋的意思?第一行的数字是状态,第二行是状态翻译成人话的样子. 第三行是 Task 里面 Callable 的toString
结果. 这里为了好看, 我重写了toString
.
下面黑色的, 我画成了一个池子的样子,来表示线程池
. 就是那个HashSet
.固定 3 个, 所以,一开始,就出现了 3 个线程. 那么不固定是什么样?
corePoolSize != maximumPoolSize
这次,任务不变, 修改参数, 采用:
corePoolSize=1
maximumPoolSize=3
得到的运行结果是这样的.
也很简单, 就是:
-
一开始有 corePoolSize
个线程 -
队列满了时候, 开始扩大线程数到 maximumPoolSize
个线程. -
任务处理完了就缩回去了.
那么到底处理完了,空闲多久会缩回去呢?
keepAliveTime
就是这个参数在起作用, 这个参数配合上下一个时间单位参数, 表示空闲多长时间以后就把线程杀了,不让他活(Alive)了.
我把这个参数分别改成 13 秒和 3000 秒, 别的还保持上面的参数试试看.
有一点变化, 不过不明显, 下面是 3000 的. 我就不输出那么长时间的 GIF 了.
keepAlive
表示, 扩容扩出来的那部分(maximum-core)
临时工, 活儿干完了还能待多久.
现在调整成 3000 以后, 3 个线程半天了就都还在. 核心的那一个, 则是会一直保留.
workQueue
这个工作队列, 是BlockingQueue
,默认用的是LinkedBlockingQueue
. 不过因为我只会画数组, 这里换了一个ArrayBlockQueue
进去. 凑合看. 他的结构,上面已经画过了.
这个东西今天篇幅有限展不开, 就说一个好玩的点. 这个里面装的是FutureTask
,而FutureTask
众所周知, 他的类图是这样的.
里面放的是Callable
对象, 而我们明明可以提交Runnable
对象啊?
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
为什么呢, 为什么呢?
Executors.callable
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
因为 Executors 除了是个工厂类, 他同时还是个 util… 提供了一个RunnableAdapter
把Runnable
适配成了Callable
.
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
居然又看到了适配器模式
, 有点意思是吧.
ThreadFactory
倒数第二个参数,看名字就知道, 这个参数, 是个工厂
, 生产线程的工厂.
日常我们new
一个线程时候, 会给他setName
编个名字什么的,方便识别. 这些都是可以自定义的项目, 可以配置.
现在线程池里的线程都是交给Executor
自动生成的,名字肯定是没法直接改了. 那怎么办?
不用担心,Executor
这么高级的东西, 肯定留了自定义的口子的, 参数threadFactory
就是这个口子.
用户提供一个可以对线程的生成进行自定义的代码, 这个代码具体什么时候调用, 要看Executor
的心情,而不是用户主动来调用. 这场景是不是有点熟悉啊?
don’t call us, we’ll call you 好莱坞法则
不要主要 call 我们, 需要时候我们会 call 你. 完美符合好莱坞法则. 这个地方, 其实是一个控制反转
(inverse of control
). 居然出现了我们的老朋友, IOC
.
现在改下自定义下线程名, 再来跑一次:
-
corePoolSize: 1 -
maximumSize: 3 -
keepAlive + unit:13 秒
嗯, 线程名字变成自定义的了.
最后一个参数, RejectedExecutionHandler
前面细心的朋友可能已经发现了, 说了提交 39 个任务,现在每次完成的总数,并不是 39.
是这样, 为了动画能够画出来. 我对任务的提交和执行,都加了延时.
-
生产任务, 是每 500ms 一个 -
消费执行, 是每 3000ms 一个
即使 3 个线程全开了, 3 秒 3 个,每秒也只能处理 1 个,生产和消费速度是不匹配的.
嗯, 这里其实相当于一个生产者-消费者
模式. 因为界面长度有限, BlockingQueue
只设置了 10 个.很容易就会发生队列满了的情况.
提交任务, 队列满了怎么处理?
这个情况是运行时候发生的, 我们不能直接控制. 嗯, 熟悉的来了, 又一次的控制反转. 这里提供的, RejectedExecutionHandler
, 就是让Executor
在出问题时候,call
我们用的. 又是一波好莱坞法则
.
刚刚为了不中断,我用了官方提供的DiscardPolicy
. 这个策略是, 满了进不来, 就丢掉.所以少了很多.
换用官方提供的另一个策略DiscardOldestPolicy
, 他是这么写的:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
从队列头部poll
掉一个不要了,空出来一个位子,塞进去. 很直白,丢掉最老的. 我来画一下这个:
因为为了画图方便,我偷懒用了ArrayBlockingQueue
.所以这个看起来不是特别明显.
不过我右侧还有个输出框其实, 能够看到确实是比较靠前的 3,4,5 被丢掉了.后面就没有动画了. 因为暂时没准备画主线程的材料. 而且主线程和异常确实也画不出来,想不出来怎么表示.
除了这俩偷偷摸摸的. 官方还有提供其他几个策略,比如:
-
AbortPolicy 满了抛异常的 -
CallerRunsPolicy 直接在 Caller 里 run 的
本质上, 这是官方给我们使用者提供了一个自定义和扩展的机会. 日常虽然很少用到,其实还是有人用的.
dubbo
比如, 在 dubbo 中,就扩展了一个AbortPolicyWithReport
.
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d)," +
" Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
dispatchThreadPoolExhaustedEvent(msg);
throw new RejectedExecutionException(msg);
}
调用JVMUtil.jstack
打了一些日志出来.
netty
netty 中有一个MemoryAwareThreadPoolExecutor
自己实现了一个NewThreadRunsPolicy
.
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable var4) {
throw new RejectedExecutionException("Failed to start a new thread", var4);
}
}
Tomcat
目前我能看到最奇葩的一个实现,当属 Tomcat 同学. 他自己写了个RejectPolicy
出来. 但是相比官方的AbortPolicy
,几乎啥也没干,就是把异常的字符串参数给去掉了.看不懂,不敢看.
官方版本 AbortPolicy
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
Tomcat 版本 ThreadPoolExecutor.RejectHandler
throw new RejectedExecutionException();
总结
其实没啥好总结的, 线程池执行器已经被各种面试题库玩儿到全员滚瓜烂熟了.我主要是来推广Processing
的.详见Processing:Java程序员应该会喜欢的画图工具.
作为一个可视化工具,Java 版本的这东西现在已经基本属于文物了.新的js
之类,能力远远超过了Processing
. 如果真的有可视化需求,还是走主流工具比较好.
但是呢, 作为一个 Java 程序员,如果有什么像了解基础库结构的想法, Processing 的 Java 版本,无缝集成 Java 的能力还是很值得一用的.
本篇内容的图, 运行在Processing3
版本上,内部Java
版本应该是 Java6. 老版本的 Java 安全性方面,没有约束反射对java.util.concurrent
的访问, 可以很方便的窥探内部结构. 新的Processing4
版本,由于 Java 增强了反射的限制,用起来很难受.
哦,对了,ThreadPoolExecutor
还有一个shutdown
方法.调用了以后,就不能再添加新任务了,keepAlive
参数也没什么用了. 下面几个图是调用了shutdown
情况下的.
好了,今天就到这里, 我是老 K. 一个写不出来,只能拿图凑的程序员.
原文始发于微信公众号(K字的研究):线程池执行器执行过程可视化
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/24611.html