通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

前提

最近一两个月花了很大的功夫做UCloud服务和中间件迁移到阿里云的工作,没什么空闲时间撸文。想起很早之前写过ThreadLocal的源码分析相关文章,里面提到了ThreadLocal存在一个不能向预先创建的线程中进行变量传递的局限性,刚好有一位HSBC的技术大牛前同事提到了团队引入了transmittable-thread-local解决了此问题。借着这个契机,顺便clonetransmittable-thread-local源码进行分析,这篇文章会把ThreadLocalInheritableThreadLocal的局限性分析完毕,并且从一些基本原理以及设计模式的运用分析transmittable-thread-local(下文简称为TTL)整套框架的实现。

如果对线程池和ThreadLocal不熟悉的话,可以先参看一下前置文章:

  • 《JUC同步器框架AbstractQueuedSynchronizer源码图文分析》

  • 《JUC线程池ThreadPoolExecutor源码分析》

  • 《ThreadLocal源码分析-黄金分割数的使用》

这篇文章前后花了两周时间编写,行文比价干硬,文字比较多(接近5W字),希望带着耐心阅读。

父子线程的变量传递

Java中没有明确给出一个API可以基于子线程实例获取其父线程实例,有一个相对可行的方案就是在创建子线程Thread实例的时候获取当前线程的实例,用到的APIThread#currentThread()

1public class Thread implements Runnable {
2
3    // 省略其他代码
4
5    @HotSpotIntrinsicCandidate
6    public static native Thread currentThread();
7
8    // 省略其他代码
9}

Thread#currentThread()方法是一个静态本地方法,它是由JVM实现,这是在JDK中唯一可以获取父线程实例的API。一般而言,如果想在子线程实例中得到它的父线程实例,那么需要像如下这样操作:

 1public class InheritableThread {
2
3    public static void main(String[] args) throws Exception{
4        // 父线程就是main线程
5        Thread parentThread = Thread.currentThread();
6        Thread childThread = new Thread(()-> {
7            System.out.println("Parent thread is:" + parentThread.getName());
8        },"childThread");
9        childThread.start();
10        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
11    }
12}
13// 输出结果:
14Parent thread is:main

类似地,如果我们想把一个父子线程共享的变量实例传递,也可以这样做:

 1public class InheritableVars {
2
3    public static void main(String[] args) throws Exception {
4        // 父线程就是main线程
5        Thread parentThread = Thread.currentThread();
6        final Var var = new Var();
7        var.setValue1("var1");
8        var.setValue2("var2");
9        Thread childThread = new Thread(() -> {
10            System.out.println("Parent thread is:" + parentThread.getName());
11            methodFrame1(var);
12        }, "childThread");
13        childThread.start();
14        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
15    }
16
17    private static void methodFrame1(Var var) {
18        methodFrame2(var);
19    }
20
21    private static void methodFrame2(Var var) {
22
23    }
24
25    @Data
26    private static class Var {
27
28        private Object value1;
29        private Object value2;
30    }
31}

这种做法其实是可行的,子线程调用的方法栈中的所有方法都必须显示传入需要从父线程传递过来的参数引用Var实例,这样就会产生硬编码问题,既不灵活也导致方法不能复用,所以才衍生出线程本地变量Thread Local,具体的实现有ThreadLocalInheritableThreadLocal。它们两者的基本原理是类似的,实际上所有的变量实例是缓存在线程实例的变量ThreadLocal.ThreadLocalMap中,线程本地变量实例都只是线程实例获取ThreadLocal.ThreadLocalMap的一道桥梁:

 1public class Thread implements Runnable {
2
3    // 省略其他代码
4
5    // KEY为ThreadLocal实例,VALUE为具体的值
6    ThreadLocal.ThreadLocalMap threadLocals = null;
7
8    // KEY为InheritableThreadLocal实例,VALUE为具体的值
9    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
10
11    // 省略其他代码
12}

ThreadLocalInheritableThreadLocal之间的区别可以结合源码分析一下(见下一小节)。前面的分析听起来如果觉得抽象的话,可以自己写几个类推敲一下,假如线程其实叫ThrowableThread,而线程本地变量叫ThrowableThreadLocal,那么它们之间的关系如下:

 1public class Actor {
2
3    static ThrowableThreadLocal THREAD_LOCAL = new ThrowableThreadLocal();
4
5    public static void main(String[] args) throws Exception {
6        ThrowableThread throwableThread = new ThrowableThread() {
7
8            @Override
9            public void run() {
10                methodFrame1();
11            }
12        };
13        throwableThread.start();
14    }
15
16    private static void methodFrame1() {
17        THREAD_LOCAL.set("throwable");
18        methodFrame2();
19    }
20
21    private static void methodFrame2() {
22        System.out.println(THREAD_LOCAL.get());
23    }
24
25    /**
26     * 这个类暂且认为是java.lang.Thread
27     */

28    private static class ThrowableThread implements Runnable {
29
30        ThrowableThreadLocal.ThrowableThreadLocalMap threadLocalMap;
31
32        @Override
33        public void run() {
34
35        }
36
37        // 这里模拟VM的实现,返回ThrowableThread自身,大家先认为不是返回NULL
38        public static ThrowableThread getCurrentThread() {
39//            return new ThrowableThread();
40            return null;   // <--- 假设这里在VM的实现里面返回的不是NULL而是当前的ThrowableThread
41        }
42
43        public void start() {
44            run();
45        }
46    }
47
48    private static class ThrowableThreadLocal {
49
50        public ThrowableThreadLocal() {
51
52        }
53
54        public void set(Object value) {
55            ThrowableThread currentThread = ThrowableThread.getCurrentThread();
56            assert null != currentThread;
57            ThrowableThreadLocalMap threadLocalMap = currentThread.threadLocalMap;
58            if (null == threadLocalMap) {
59                threadLocalMap = currentThread.threadLocalMap = new ThrowableThreadLocalMap();
60            }
61            threadLocalMap.put(this, value);
62        }
63
64        public Object get() {
65            ThrowableThread currentThread = ThrowableThread.getCurrentThread();
66            assert null != currentThread;
67            ThrowableThreadLocalMap threadLocalMap = currentThread.threadLocalMap;
68            if (null == threadLocalMap) {
69                return null;
70            }
71            return threadLocalMap.get(this);
72        }
73
74        // 这里其实在ThreadLocal中用的是WeakHashMap
75        public static class ThrowableThreadLocalMap extends HashMap<ThrowableThreadLocalObject{
76
77        }
78    }
79}

上面的代码不能运行,只是通过一个自定义的实现说明一下其中的原理和关系。

ThreadLocal和InheritableThreadLocal的局限性

InheritableThreadLocalThreadLocal的子类,它们之间的联系是:两者都是线程Thread实例获取ThreadLocal.ThreadLocalMap的一个中间变量。区别是:两者控制ThreadLocal.ThreadLocalMap创建的时机和通过Thread实例获取ThreadLocal.ThreadLocalMapThread实例中对应的属性并不一样,导致两者的功能有一点差别。通俗来说两者的功能联系和区别是:

  • ThreadLocal:单个线程生命周期强绑定,只能在某个线程的生命周期内对ThreadLocal进行存取,不能跨线程存取。

 1public class ThreadLocalMain {
2
3    private static ThreadLocal<String> TL = new ThreadLocal<>();
4
5    public static void main(String[] args) throws Exception {
6        new Thread(() -> {
7            methodFrame1();
8        }, "childThread").start();
9        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
10    }
11
12    private static void methodFrame1() {
13        TL.set("throwable");
14        methodFrame2();
15    }
16
17    private static void methodFrame2() {
18        System.out.println(TL.get());
19    }
20}
21// 输出结果:
22throwable
  • InheritableThreadLocal:(1)可以无感知替代ThreadLocal的功能,当成ThreadLocal使用。(2)明确父-子线程关系的前提下,继承(拷贝)父线程的线程本地变量缓存过的变量,而这个拷贝的时机是子线程Thread实例化时候进行的,也就是子线程实例化完毕后已经完成了InheritableThreadLocal变量的拷贝,这是一个变量传递的过程。

 1public class InheritableThreadLocalMain {
2
3    // 此处可以尝试替换为ThreadLocal,最后会输出null
4    static InheritableThreadLocal<String> ITL = new InheritableThreadLocal<>();
5
6    public static void main(String[] args) throws Exception {
7        new Thread(() -> {
8            // 在父线程中设置变量
9            ITL.set("throwable");
10            new Thread(() -> {
11                methodFrame1();
12            }, "childThread").start();
13        }, "parentThread").start();
14        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
15    }
16
17    private static void methodFrame1() {
18        methodFrame2();
19    }
20
21    private static void methodFrame2() {
22        System.out.println(ITL.get());
23    }
24}
25// 输出结果:
26throwable

上面提到的两点可以具体参看ThreadLocalInheritableThreadLocalThread三个类的源码,这里笔者把一些必要的注释和源码段贴出:

  1// --> java.lang.Thread类的源码片段
2public class Thread implements Runnable {
3
4    // 省略其他代码 
5
6    // 这是Thread最基本的构造函数
7    private Thread(ThreadGroup g, Runnable target, String name,
8                   long stackSize, AccessControlContext acc,
9                   boolean inheritThreadLocals)
 
{
10
11        // 省略其他代码
12
13        Thread parent = currentThread();
14        this.group = g;
15        this.daemon = parent.isDaemon();
16        this.priority = parent.getPriority();
17        if (security == null || isCCLOverridden(parent.getClass()))
18            this.contextClassLoader = parent.getContextClassLoader();
19        else
20            this.contextClassLoader = parent.contextClassLoader;
21        this.inheritedAccessControlContext =
22                acc != null ? acc : AccessController.getContext();
23        this.target = target;
24        setPriority(priority);
25        // inheritThreadLocals一般情况下为true
26        // 当前子线程实例拷贝父线程的inheritableThreadLocals属性,创建一个新的ThreadLocal.ThreadLocalMap实例赋值到自身的inheritableThreadLocals属性
27        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
28            this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
29        this.stackSize = stackSize;
30        this.tid = nextThreadID();
31    }
32
33    // 省略其他代码
34}
35
36// --> java.lang.ThreadLocal源码片段
37public class ThreadLocal<T{
38
39    // 省略其他代码 
40
41    public void set(T value) {
42        Thread t = Thread.currentThread();
43        // 通过当前线程获取线程实例中的threadLocals
44        ThreadLocalMap map = getMap(t);
45        // 线程实例中的threadLocals为NULL,实例则创建一个ThreadLocal.ThreadLocalMap实例添加当前ThreadLocal->VALUE到ThreadLocalMap中,如果已经存在ThreadLocalMap则进行覆盖对应的Entry
46        if (map != null) {
47            map.set(this, value);
48        } else {
49            createMap(t, value);
50        }
51    }
52
53    // 通过线程实例获取该线程的threadLocals实例,其实是ThreadLocal.ThreadLocalMap类型的属性
54    ThreadLocalMap getMap(Thread t) {
55        return t.threadLocals;
56    }
57
58
59    public T get() {
60        Thread t = Thread.currentThread();
61        // 通过当前线程获取线程实例中的threadLocals,再获取ThreadLocal.ThreadLocalMap中匹配上KEY为当前ThreadLocal实例的Entry对应的VALUE
62        ThreadLocalMap map = getMap(t);
63        if (map != null) {
64            ThreadLocalMap.Entry e = map.getEntry(this);
65            if (e != null) {
66                @SuppressWarnings("unchecked")
67                T result = (T)e.value;
68                return result;
69            }
70        }
71        // 找不到则尝试初始化ThreadLocal.ThreadLocalMap
72        return setInitialValue();
73    }
74
75    // 如果不存在ThreadLocal.ThreadLocalMap,则通过初始化initialValue()方法的返回值,构造一个ThreadLocal.ThreadLocalMap
76    private T setInitialValue() {
77        T value = initialValue();
78        Thread t = Thread.currentThread();
79        ThreadLocalMap map = getMap(t);
80        if (map != null)
81            map.set(this, value);
82        else
83            createMap(t, value);
84        return value;
85    }
86
87    // 省略其他代码 
88}
89
90// --> java.lang.InheritableThreadLocal源码 - 太简单,全量贴出
91public class InheritableThreadLocal<Textends ThreadLocal<T{
92
93    // 这个方法使用在线程Thread的构造函数里面ThreadLocal.createInheritedMap(),基于父线程InheritableThreadLocal的属性创建子线程的InheritableThreadLocal属性,它的返回值决定了拷贝父线程的属性时候传入子线程的值
94    protected T childValue(T parentValue) {
95        return parentValue;
96    }
97
98    // 覆盖获取线程实例中的绑定的ThreadLocalMap为Thread#inheritableThreadLocals,这个方法其实是覆盖了ThreadLocal中对应的方法,应该加@Override注解
99    ThreadLocalMap getMap(Thread t) {
100       return t.inheritableThreadLocals;
101    }
102
103    // 覆盖创建ThreadLocalMap的逻辑,赋值到线程实例中的inheritableThreadLocals,而不是threadLocals,这个方法其实是覆盖了ThreadLocal中对应的方法,应该加@Override注解
104    void createMap(Thread t, T firstValue) {
105        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
106    }
107}

一定要注意,这里的setInitialValue()方法很重要,一个新的线程Thread实例在初始化(对于InheritableThreadLocal而言继承父线程的线程本地变量)或者是首次调用ThreadLocal#set(),会通过此setInitialValue()方法去构造一个全新的ThreadLocal.ThreadLocalMap,会直接使用createMap()方法。

以前面提到的两个例子,贴一个图加深理解:

Example-1

通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

Example-2

通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

ThreadLocalInheritableThreadLocal的最大局限性就是:无法为预先创建好(未投入使用)的线程实例传递变量(准确来说是首次传递某些场景是可行的,而后面由于线程池中的线程是复用的,无法进行更新或者修改变量的传递值),泛线程池Executor体系、TimerTaskForkJoinPool等一般会预先创建(核心)线程,也就它们都是无法在线程池中由预创建的子线程执行的Runnable任务实例中使用。例如下面的方式会导致参数传递失败:

 1public class InheritableThreadForExecutor {
2
3    static final InheritableThreadLocal<String> ITL = new InheritableThreadLocal<>();
4    static final Executor EXECUTOR = Executors.newFixedThreadPool(1);
5
6    public static void main(String[] args) throws Exception {
7        ITL.set("throwable");
8        EXECUTOR.execute(() -> {
9            System.out.println(ITL.get());
10        });
11        ITL.set("doge");
12        EXECUTOR.execute(() -> {
13            System.out.println(ITL.get());
14        });
15        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
16    }
17}
18// 输出结果:
19throwable
20throwable   # <--- 可见此处参数传递出现异常

首次变量传递成功是因为线程池中的所有子线程都是派生自main线程。

TTL的简单使用

TTL的使用方式在它的项目README.md或者项目中的单元测试有十分详细的介绍,先引入依赖com.alibaba:transmittable-thread-local:2.11.4,这里演示一个例子:

 1// 父-子线程
2public class TtlSample1 {
3
4    static TransmittableThreadLocal<String> TTL = new TransmittableThreadLocal<>();
5
6    public static void main(String[] args) throws Exception {
7        new Thread(() -> {
8            // 在父线程中设置变量
9            TTL.set("throwable");
10            new Thread(TtlRunnable.get(() -> {
11                methodFrame1();
12            }), "childThread").start();
13        }, "parentThread").start();
14        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
15    }
16
17    private static void methodFrame1() {
18        methodFrame2();
19    }
20
21    private static void methodFrame2() {
22        System.out.println(TTL.get());
23    }
24}
25// 输出:
26throwable
27
28// 线程池
29public class TtlSample2 {
30
31    static TransmittableThreadLocal<String> TTL = new TransmittableThreadLocal<>();
32    static final Executor EXECUTOR = Executors.newFixedThreadPool(1);
33
34    public static void main(String[] args) throws Exception {
35        TTL.set("throwable");
36        EXECUTOR.execute(TtlRunnable.get(() -> {
37            System.out.println(TTL.get());
38        }));
39        TTL.set("doge");
40        EXECUTOR.execute(TtlRunnable.get(() -> {
41            System.out.println(TTL.get());
42        }));
43        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
44    }
45}
46// 输出:
47throwable
48doge

TTL实现的基本原理

TTL设计上使用了大量的委托(Delegate),委托是C#里面的说法,对标Java的设计模式就是代理模式。举个简单的例子:

 1@Slf4j
2public class StaticDelegate {
3
4    public static void main(String[] args) throws Exception {
5        new RunnableDelegate(() -> log.info("Hello World!")).run();
6    }
7
8    @Slf4j
9    @RequiredArgsConstructor
10    private static final class RunnableDelegate implements Runnable {
11
12        private final Runnable runnable;
13
14        @Override
15        public void run() {
16            try {
17                log.info("Before run...");
18                runnable.run();
19                log.info("After run...");
20            } finally {
21                log.info("Finally run...");
22            }
23        }
24    }
25}
26// 输出结果:
2723:45:27.763 [main] INFO club.throwable.juc.StaticDelegate$RunnableDelegate - Before run...
2823:45:27.766 [main] INFO club.throwable.juc.StaticDelegate - Hello World!
2923:45:27.766 [main] INFO club.throwable.juc.StaticDelegate$RunnableDelegate - After run...
3023:45:27.766 [main] INFO club.throwable.juc.StaticDelegate$RunnableDelegate - Finally run...

委托如果使用纯熟的话,可以做出很多十分有用的功能,例如可以基于Micrometer去统计任务的执行时间,上报到Prometheus,然后用Grafana做监控和展示:

 1// 需要引入io.micrometer:micrometer-core:${version}
2@Slf4j
3public class MeterDelegate {
4
5    public static void main(String[] args) throws Exception {
6        Executor executor = Executors.newFixedThreadPool(1);
7        Runnable task = () -> {
8            try {
9                // 模拟耗时
10                Thread.sleep(1000);
11            } catch (Exception ignore) {
12
13            }
14        };
15        Map<String, String> tags = new HashMap<>(8);
16        tags.put("_class""MeterDelegate");
17        executor.execute(new MicrometerDelegate(task, "test-task", tags));
18        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
19    }
20
21    @Slf4j
22    @RequiredArgsConstructor
23    private static final class MicrometerDelegate implements Runnable {
24
25        private final Runnable runnable;
26        private final String taskType;
27        private final Map<String, String> tags;
28
29        @Override
30        public void run() {
31            long start = System.currentTimeMillis();
32            try {
33                runnable.run();
34            } finally {
35                long end = System.currentTimeMillis();
36                List<Tag> tagsList = Lists.newArrayList();
37                Optional.ofNullable(tags).ifPresent(x -> x.forEach((k, v) -> {
38                    tagsList.add(Tag.of(k, v));
39                }));
40                Metrics.summary(taskType, tagsList).record(end - start);
41            }
42        }
43    }
44}

委托理论上只要不线程栈溢出,可以无限层级地包装,有点像洋葱的结构,原始的目标方法会被包裹在最里面并且最后执行:

 1    public static void main(String[] args) throws Exception {
2        Runnable target = () -> log.info("target");
3        Delegate level1 = new Delegate(target);
4        Delegate level2 = new Delegate(level1);
5        Delegate level3 = new Delegate(level2);
6        // ......
7    }
8
9    @RequiredArgsConstructor
10    static class Delegate implements Runnable{
11
12        private final Runnable runnable;
13
14        @Override
15        public void run() {
16            runnable.run();
17        }
18    }

当然,委托的层级越多,代码结构就会越复杂,不利于理解和维护。多层级委托这个洋葱结构,再配合Java反射API剥离对具体方法调用的依赖,就是Java中切面编程的普遍原理,Spring-aop就是这样实现的。委托如果再结合Agent和字节码增强(使用ASMJavassist等),可以实现类加载时期替换对应的RunnableCallable或者一般接口的实现,这样就能无感知完成了增强功能。此外,TTL中还使用了模板方法模式,如:

 1@Slf4j
2public class TemplateMethod {
3
4    public static void main(String[] args) throws Exception {
5        Runnable runnable = () -> log.info("Hello World!");
6        Template template = new Template(runnable) {
7            @Override
8            protected void beforeExecute() {
9                log.info("BeforeExecute...");
10            }
11
12            @Override
13            protected void afterExecute() {
14                log.info("AfterExecute...");
15            }
16        };
17        template.run();
18    }
19
20    @RequiredArgsConstructor
21    static abstract class Template implements Runnable {
22
23        private final Runnable runnable;
24
25        protected void beforeExecute() {
26
27        }
28
29        @Override
30        public void run() {
31            beforeExecute();
32            runnable.run();
33            afterExecute();
34        }
35
36        protected void afterExecute() {
37
38        }
39    }
40}
41// 输出结果:
4200:25:32.862 [main] INFO club.throwable.juc.TemplateMethod - BeforeExecute...
4300:25:32.865 [main] INFO club.throwable.juc.TemplateMethod - Hello World!
4400:25:32.865 [main] INFO club.throwable.juc.TemplateMethod - AfterExecute...

分析了两种设计模式,下面简单理解一下TTL实现的伪代码:

1# TTL extends InheritableThreadLocal
2# Holder of TTL -> InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> [? => NULL]
31)创建一个全局的Holder,用于保存父线程(或者明确了父线程的子线程)的TTL对象,这里注意,是TTL对象,Holder是当作Set使用
42)(父)线程A中使用了TTL,则所有设置的变量会被TTL捕获
53)(子)线程B使用了TtlRunnable(Runnable的TTL实现,使用了前面提到的委托,像Callable的实现是TtlCallable),会重放所有存储在TTL中的,来自于线程A的存储变量
64)线程B重放完毕后,清理线程B独立产生的ThreadLocal变量,归还变TTL的变量

主要就是这几步,里面的话术有点抽象,后面一节分析源码的时候会详细讲解。

TTL的源码分析

主要分析:

  • 框架的骨架。

  • 核心类TransmittableThreadLocal

  • 发射器Transmitter

  • 捕获、重放和复原。

  • Agent模块。

TTL框架骨架

TTL是一个十分精悍的框架,它依赖少量的类实现了比较强大的功能,除了提供给用户使用的API,还提供了基于Agent和字节码增强实现了无感知增强泛线程池对应类的功能,这一点是比较惊艳的。这里先分析编程式的API,再简单分析Agent部分的实现。笔者阅读TTL框架的时间是2020年五一劳动节前后,当前的最新发行版本为2.11.4TTL的项目结构很简单:

1- transmittable-thread-local
2  - com.alibaba.ttl
3   - spi   SPI接口和一些实现
4   - threadpool   线程池增强,包括ThreadFactory和线程池的Wrapper等
5     - agent   线程池的Agent实现相关
6   最外层的包有一些Wrapper的实现和TTL

先看spi包:

1- spi
2  TtlAttachments
3  TtlAttachmentsDelegate
4  TtlEnhanced
5  TtlWrapper
通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

TtlEnhancedTTL的标识接口(空接口),标识具体的组件被TTL增强:

1public interface TtlEnhanced {
2
3}

通过instanceof关键字就可以判断具体的实现是否TTL增强过的组件。TtlWrapper接口继承自接口TtlEnhanced,用于标记实现类可以解包装获得原始实例:

1public interface TtlWrapper<Textends TtlEnhanced {
2
3    // 返回解包装实例,实际是就是原始实例
4    @NonNull
5    unwrap();
6}

TtlAttachments接口也是继承自接口TtlEnhanced,用于为TTL添加K-V结构的附件,TtlAttachmentsDelegate是其实现类,K-V的存储实际上是委托给ConcurrentHashMap

 1public interface TtlAttachments extends TtlEnhanced {
2
3    // 添加K-V附件
4    void setTtlAttachment(@NonNull String key, Object value);
5
6    // 通过KEY获取值
7    <T> getTtlAttachment(@NonNull String key);
8
9    // 标识自动包装的KEY,Agent模式会使用自动包装,这个时候会传入一个附件的K-V,其中KEY就是KEY_IS_AUTO_WRAPPER
10    String KEY_IS_AUTO_WRAPPER = "ttl.is.auto.wrapper";
11}
12
13// TtlAttachmentsDelegate
14public class TtlAttachmentsDelegate implements TtlAttachments {
15
16    private final ConcurrentMap<String, Object> attachments = new ConcurrentHashMap<String, Object>();
17
18    @Override
19    public void setTtlAttachment(@NonNull String key, Object value) {
20        attachments.put(key, value);
21    }
22
23    @Override
24    @SuppressWarnings("unchecked")
25    public <T> getTtlAttachment(@NonNull String key) {
26        return (T) attachments.get(key);
27    }
28}

因为TTL的实现覆盖了泛线程池ExecutorExecutorServiceScheduledExecutorServiceForkJoinPoolTimerTask(在TTL中组件已经标记为过期,推荐使用ScheduledExecutorService),范围比较广,短篇幅无法分析所有的源码,而且它们的实现思路是基本一致的,笔者下文只会挑选Executor的实现路线进行分析。

通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

核心类TransmittableThreadLocal

TransmittableThreadLocalTTL的核心类,TTL框架就是用这个类来命名的。先看它的构造函数和关键属性:

 1// 函数式接口,TTL拷贝器
2@FunctionalInterface
3public interface TtlCopier<T{
4
5    // 拷贝父属性
6    copy(T parentValue);
7}
8
9public class TransmittableThreadLocal<Textends InheritableThreadLocal<Timplements TtlCopier<T{
10
11    // 日志句柄,使用的不是SLF4J的接口,而是java.util.logging的实现
12    private static final Logger logger = Logger.getLogger(TransmittableThreadLocal.class.getName());
13
14    // 是否禁用忽略NULL值的语义
15    private final boolean disableIgnoreNullValueSemantics;
16
17    // 默认是false,也就是不禁用忽略NULL值的语义,也就是忽略NULL值,也就是默认的话,NULL值传入不会覆盖原来已经存在的值
18    public TransmittableThreadLocal() {
19        this(false);
20    }
21
22    // 可以通过手动设置,去覆盖IgnoreNullValue的语义,如果设置为true,则是支持NULL值的设置,设置为true的时候,与ThreadLocal的语义一致
23    public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) {
24        this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics;
25    }
26
27    // 先忽略其他代码
28}

disableIgnoreNullValueSemantics属性相关可以查看Issue157,下文分析方法的时候也会说明具体的场景。TransmittableThreadLocal继承自InheritableThreadLocal,本质就是ThreadLocal,那它到底怎么样保证变量可以在线程池中的线程传递?接着分析其他所有方法:

  1public class TransmittableThreadLocal<Textends InheritableThreadLocal<Timplements TtlCopier<T{
2
3    // 拷贝器的拷贝方法实现
4    public T copy(T parentValue) {
5        return parentValue;
6    }
7
8    // 模板方法,留给子类实现,在TtlRunnable或者TtlCallable执行前回调
9    protected void beforeExecute() {
10    }
11
12    // 模板方法,留给子类实现,在TtlRunnable或者TtlCallable执行后回调
13    protected void afterExecute() {
14    }
15
16    // 获取值,直接从InheritableThreadLocal#get()获取
17    @Override
18    public final T get() {
19        T value = super.get();
20        // 如果值不为NULL 或者 禁用了忽略空值的语义(也就是和ThreadLocal语义一致),则重新添加TTL实例自身到存储器
21        if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
22        return value;
23    }
24
25    @Override
26    public final void set(T value) {
27        // 如果不禁用忽略空值的语义,也就是需要忽略空值,并且设置的入参值为空,则做一次彻底的移除,包括从存储器移除TTL自身实例,TTL(ThrealLocalMap)中也移除对应的值
28        if (!disableIgnoreNullValueSemantics && null == value) {
29            // may set null to remove value
30            remove();
31        } else {
32            // TTL(ThrealLocalMap)中设置对应的值
33            super.set(value);
34            // 添加TTL实例自身到存储器
35            addThisToHolder();
36        }
37    }
38
39    // 从存储器移除TTL自身实例,从TTL(ThrealLocalMap)中移除对应的值
40    @Override
41    public final void remove() {
42        removeThisFromHolder();
43        super.remove();
44    }
45
46    // 从TTL(ThrealLocalMap)中移除对应的值
47    private void superRemove() {
48        super.remove();
49    }
50
51    // 拷贝值,主要是拷贝get()的返回值
52    private T copyValue() {
53        return copy(get());
54    }
55
56    // 存储器,本身就是一个InheritableThreadLocal(ThreadLocal)
57    // 它的存放对象是WeakHashMap<TransmittableThreadLocal<Object>, ?>类型,而WeakHashMap的VALUE总是为NULL,这里当做Set容器使用,WeakHashMap支持NULL值
58    private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
59            new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
60                @Override
61                protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
62                    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
63                }
64
65                @Override
66                protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
67                    // 注意这里的WeakHashMap总是拷贝父线程的值
68                    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
69                }
70            };
71
72    // 添加TTL自身实例到存储器,不存在则添加策略
73    @SuppressWarnings("unchecked")
74    private void addThisToHolder() {
75        if (!holder.get().containsKey(this)) {
76            holder.get().put((TransmittableThreadLocal<Object>) thisnull); // WeakHashMap supports null value.
77        }
78    }
79
80    // 从存储器移除TTL自身的实例
81    private void removeThisFromHolder() {
82        holder.get().remove(this);
83    }
84
85    // 执行目标方法,isBefore决定回调beforeExecute还是afterExecute,注意此回调方法会吞掉所有的异常只打印日志
86    private static void doExecuteCallback(boolean isBefore) {
87        for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
88            try {
89                if (isBefore) threadLocal.beforeExecute();
90                else threadLocal.afterExecute();
91            } catch (Throwable t) {
92                if (logger.isLoggable(Level.WARNING)) {
93                    logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t);
94                }
95            }
96        }
97    }
98
99    // DEBUG模式下打印TTL里面的所有值
100    static void dump(@Nullable String title) {
101        if (title != null && title.length() > 0) {
102            System.out.printf("Start TransmittableThreadLocal[%s] Dump...%n", title);
103        } else {
104            System.out.println("Start TransmittableThreadLocal Dump...");
105        }
106
107        for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
108            System.out.println(threadLocal.get());
109        }
110        System.out.println("TransmittableThreadLocal Dump end!");
111    }
112
113    // DEBUG模式下打印TTL里面的所有值
114    static void dump() {
115        dump(null);
116    }
117
118    // 省略静态类Transmitter的实现代码
119}

这里一定要记住holder是全局静态的,并且它自身也是一个InheritableThreadLocalget()方法也是线程隔离的),它实际上就是线程管理所有TransmittableThreadLocal的桥梁。这里可以考虑一个单线程的例子来说明TransmittableThreadLocal的存储架构:

 1public class TtlSample3 {
2
3    static TransmittableThreadLocal<String> TTL1 = new TransmittableThreadLocal<>();
4    static TransmittableThreadLocal<String> TTL2 = new TransmittableThreadLocal<>();
5    static TransmittableThreadLocal<String> TTL3 = new TransmittableThreadLocal<>();
6
7    public static void main(String[] args) throws Exception {
8        TTL1.set("VALUE-1");
9        TTL2.set("VALUE-2");
10        TTL3.set("VALUE-3");
11    }
12}

这里简化了例子,只演示了单线程的场景,图中的一些对象的哈希码有可能每次启动JVM实例都不一样,这里只是做示例:

通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

注释里面也提到,holder里面的WeakHashMap是当成Set容器使用,映射的值都是NULL,每次遍历它的所有KEY就能获取holder里面的所有的TransmittableThreadLocal实例,它是一个全局的存储器,但是本身是一个InheritableThreadLocal,多线程共享后的映射关系会相对复杂:

通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

再聊一下disableIgnoreNullValueSemantics的作用,默认情况下disableIgnoreNullValueSemantics=falseTTL如果设置NULL值,会直接从holder移除对应的TTL实例,在TTL#get()方法被调用的时候,如果原来持有的属性不为NULL,该TTL实例会重新加到holder。如果设置disableIgnoreNullValueSemantics=true,则set(null)的语义和ThreadLocal一致。见下面的例子:

 1public class TtlSample4 {
2
3    static TransmittableThreadLocal<Integer> TL1 = new TransmittableThreadLocal<Integer>(false) {
4        @Override
5        protected Integer initialValue() {
6            return 5;
7        }
8
9        @Override
10        protected Integer childValue(Integer parentValue) {
11            return 10;
12        }
13    };
14
15    static TransmittableThreadLocal<Integer> TL2 = new TransmittableThreadLocal<Integer>(true) {
16        @Override
17        protected Integer initialValue() {
18            return 5;
19        }
20
21        @Override
22        protected Integer childValue(Integer parentValue) {
23            return 10;
24        }
25    };
26
27    public static void main(String[] args) throws Exception {
28        TL1.set(null);
29        TL2.set(null);
30        Thread t1 = new Thread(TtlRunnable.get(() -> {
31            System.out.println(String.format("Thread:%s,value:%s", Thread.currentThread().getName(), TL1.get()));
32        }), "T1");
33
34        Thread t2 = new Thread(TtlRunnable.get(() -> {
35            System.out.println(String.format("Thread:%s,value:%s", Thread.currentThread().getName(), TL2.get()));
36        }), "T2");
37        t1.start();
38        t2.start();
39        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
40    }
41}
42// 输出结果:
43Thread:T2,value:null
44Thread:T1,value:5

这是因为框架的设计者不想把NULL作为有状态的值,如果真的有需要保持和ThreadLocal一致的用法,可以在构造TransmittableThreadLocal实例的时候传入true

发射器Transmitter

发射器TransmitterTransmittableThreadLocal的一个公有静态类,它的核心功能是传输所有的TransmittableThreadLocal实例和提供静态方法注册当前线程的变量到其他线程。按照笔者阅读源码的习惯,先看构造函数和关键属性:

 1// # TransmittableThreadLocal#Transmitter
2public static class Transmitter {
3
4    // 保存手动注册的ThreadLocal->TtlCopier映射,这里是因为部分API提供了TtlCopier给用户实现
5    private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>();
6    // threadLocalHolder更变时候的监视器
7    private static final Object threadLocalHolderUpdateLock = new Object();
8    // 标记WeakHashMap中的ThreadLocal的对应值为NULL的属性,便于后面清理
9    private static final Object threadLocalClearMark = new Object();
10
11    // 默认的拷贝器,影子拷贝,直接返回父值
12    private static final TtlCopier<Object> shadowCopier = new TtlCopier<Object>() {
13        @Override
14        public Object copy(Object parentValue) {
15            return parentValue;
16        }
17    };
18
19    // 私有构造,说明只能通过静态方法提供外部调用
20    private Transmitter() {
21        throw new InstantiationError("Must not instantiate this class");
22    }
23
24    // 私有静态类,快照,保存从holder中捕获的所有TransmittableThreadLocal和外部手动注册保存在threadLocalHolder的ThreadLocal的K-V映射快照
25    private static class Snapshot {
26        final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
27        final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value;
28
29        private Snapshot(WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
30            this.ttl2Value = ttl2Value;
31            this.threadLocal2Value = threadLocal2Value;
32        }
33    }
34}

Transmitter在设计上是一个典型的工具类,外部只能调用其公有静态方法。接着看其他静态方法:

  1// # TransmittableThreadLocal#Transmitter
2public static class Transmitter {
3
4    //######################################### 捕获 ###########################################################
5
6    // 捕获当前线程绑定的所有的TransmittableThreadLocal和已经注册的ThreadLocal的值 - 使用了用时拷贝快照的策略
7    // 笔者注:它一般在构造任务实例的时候被调用,因此当前线程相对于子线程或者线程池的任务就是父线程,其实本质是捕获父线程的所有线程本地变量的值
8    @NonNull
9    public static Object capture() {
10        return new Snapshot(captureTtlValues(), captureThreadLocalValues());
11    }
12
13    // 新建一个WeakHashMap,遍历TransmittableThreadLocal#holder中的所有TransmittableThreadLocal的Entry,获取K-V,存放到这个新的WeakHashMap返回
14    private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
15        WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
16        for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
17            ttl2Value.put(threadLocal, threadLocal.copyValue());
18        }
19        return ttl2Value;
20    }
21
22    // 新建一个WeakHashMap,遍历threadLocalHolder中的所有ThreadLocal的Entry,获取K-V,存放到这个新的WeakHashMap返回
23    private static WeakHashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
24        final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>();
25        for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
26            final ThreadLocal<Object> threadLocal = entry.getKey();
27            final TtlCopier<Object> copier = entry.getValue();
28            threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
29        }
30        return threadLocal2Value;
31    }
32
33    //######################################### 重放 ###########################################################
34
35    // 重放capture()方法中捕获的TransmittableThreadLocal和手动注册的ThreadLocal中的值,本质是重新拷贝holder中的所有变量,生成新的快照
36    // 笔者注:重放操作一般会在子线程或者线程池中的线程的任务执行的时候调用,因此此时的holder#get()拿到的是子线程的原来就存在的本地线程变量,重放操作就是把这些子线程原有的本地线程变量备份
37    @NonNull
38    public static Object replay(@NonNull Object captured) {
39        final Snapshot capturedSnapshot = (Snapshot) captured;
40        return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
41    }
42
43    // 重放所有的TTL的值
44    @NonNull
45    private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
46        // 新建一个新的备份WeakHashMap,其实也是一个快照
47        WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
48        // 这里的循环针对的是子线程,用于获取的是子线程的所有线程本地变量
49        for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
50            TransmittableThreadLocal<Object> threadLocal = iterator.next();
51
52            // 拷贝holder当前线程(子线程)绑定的所有TransmittableThreadLocal的K-V结构到备份中
53            backup.put(threadLocal, threadLocal.get());
54
55            // 清理所有的非捕获快照中的TTL变量,以防有中间过程引入的额外的TTL变量(除了父线程的本地变量)影响了任务执行后的重放操作
56            // 简单来说就是:移除所有子线程的不包含在父线程捕获的线程本地变量集合的中所有子线程本地变量和对应的值
57            /**
58             * 这个问题可以举个简单的例子:
59             * static TransmittableThreadLocal<Integer> TTL = new TransmittableThreadLocal<>();
60             * 
61             * 线程池中的子线程C中原来初始化的时候,在线程C中绑定了TTL的值为10087,C线程是核心线程不会主动销毁。
62             * 
63             * 父线程P在没有设置TTL值的前提下,调用了线程C去执行任务,那么在C线程的Runnable包装类中通过TTL#get()就会获取到10087,显然是不符合预期的
64             *
65             * 所以,在C线程的Runnable包装类之前之前,要从C线程的线程本地变量,移除掉不包含在父线程P中的所有线程本地变量,确保Runnable包装类执行期间只能拿到父线程中捕获到的线程本地变量
66             *
67             * 下面这个判断和移除做的就是这个工作
68             */

69            if (!captured.containsKey(threadLocal)) {
70                iterator.remove();
71                threadLocal.superRemove();
72            }
73        }
74
75        // 重新设置TTL的值到捕获的快照中
76        // 其实真实的意图是:把从父线程中捕获的所有线程本地变量重写设置到TTL中,本质上,子线程holder里面的TTL绑定的值会被刷新
77        setTtlValuesTo(captured);
78
79        // 回调模板方法beforeExecute
80        doExecuteCallback(true);
81
82        return backup;
83    }
84
85    // 提取WeakHashMap中的KeySet,遍历所有的TransmittableThreadLocal,重新设置VALUE
86    private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
87        for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
88            TransmittableThreadLocal<Object> threadLocal = entry.getKey();
89            // 重新设置TTL值,本质上,当前线程(子线程)holder里面的TTL绑定的值会被刷新
90            threadLocal.set(entry.getValue());
91        }
92    }
93
94    // 重放所有的手动注册的ThreadLocal的值
95    private static WeakHashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> captured) {
96        // 新建备份
97        final WeakHashMap<ThreadLocal<Object>, Object> backup = new WeakHashMap<ThreadLocal<Object>, Object>();
98        // 注意这里是遍历捕获的快照中的ThreadLocal
99        for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {
100            final ThreadLocal<Object> threadLocal = entry.getKey();
101            // 添加到备份中
102            backup.put(threadLocal, threadLocal.get());
103            final Object value = entry.getValue();
104            // 如果值为清除标记则绑定在当前线程的变量进行remove,否则设置值覆盖
105            if (value == threadLocalClearMark) threadLocal.remove();
106            else threadLocal.set(value);
107        }
108        return backup;
109    }
110
111    // 从relay()或者clear()方法中恢复TransmittableThreadLocal和手工注册的ThreadLocal的值对应的备份
112    // 笔者注:恢复操作一般会在子线程或者线程池中的线程的任务执行的时候调用
113    public static void restore(@NonNull Object backup) {
114        final Snapshot backupSnapshot = (Snapshot) backup;
115        restoreTtlValues(backupSnapshot.ttl2Value);
116        restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
117    }
118
119    private static void restoreTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> backup) {
120        // 回调模板方法afterExecute
121        doExecuteCallback(false);
122        // 这里的循环针对的是子线程,用于获取的是子线程的所有线程本地变量
123        for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
124            TransmittableThreadLocal<Object> threadLocal = iterator.next();
125            // 如果子线程原来就绑定的线程本地变量的值,如果不包含某个父线程传来的对象,那么就删除
126            // 这一步可以结合前面reply操作里面的方法段一起思考,如果不删除的话,就相当于子线程的原来存在的线程本地变量绑定值被父线程对应的值污染了
127            if (!backup.containsKey(threadLocal)) {
128                iterator.remove();
129                threadLocal.superRemove();
130            }
131        }
132
133        // 重新设置TTL的值到捕获的快照中
134        // 其实真实的意图是:把子线程的线程本地变量恢复到reply()的备份(前面的循环已经做了父线程捕获变量的判断),本质上,等于把holder中绑定于子线程本地变量的部分恢复到reply操作之前的状态
135        setTtlValuesTo(backup);
136    }
137
138    // 恢复所有的手动注册的ThreadLocal的值
139    private static void restoreThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> backup) {
140        for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
141            final ThreadLocal<Object> threadLocal = entry.getKey();
142            threadLocal.set(entry.getValue());
143        }
144    }
145}   

这里三个核心方法,看起来比较抽象,要结合多线程的场景和一些空间想象进行推敲才能比较容易地理解:

  • capture():捕获操作,父线程原来就存在的线程本地变量映射和手动注册的线程本地变量映射捕获,得到捕获的快照值captured

  • reply():重放操作,子线程原来就存在的线程本地变量映射和手动注册的线程本地变量生成备份backup,刷新captured的所有值到子线程在全局存储器holder中绑定的值。

  • restore():复原操作,子线程原来就存在的线程本地变量映射和手动注册的线程本地变量恢复成backup

setTtlValuesTo()这个方法比较隐蔽,要特别要结合多线程和空间思维去思考,例如当入参是captured,本质是从父线程捕获到的绑定在父线程的所有线程本地变量,调用的时机在reply()restore(),这两个方法只会在子线程中调用,setTtlValuesTo()里面拿到的TransmittableThreadLocal实例调用set()方法相当于把绑定在父线程的所有线程本地变量的值全部刷新到子线程当前绑定的TTL中的线程本地变量的值,更深层次地想,是基于外部的传入值刷新了子线程绑定在全局存储器holder里面绑定到该子线程的线程本地变量的值。

通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

Transmitter还有不少静态工具方法,这里不做展开,可以参考项目里面的测试demoREADME.md进行调试。

捕获、重放和复原

其实上面一节已经介绍了Transmitter提供的捕获、重放和复原的API,这一节主要结合分析TtlRunnable中的相关逻辑。TtlRunnable的源码如下:

 1public final class TtlRunnable implements RunnableTtlWrapper<Runnable>, TtlEnhancedTtlAttachments {
2
3    // 存放从父线程捕获得到的线程本地变量映射的备份
4    private final AtomicReference<Object> capturedRef;
5    // 原始的Runable实例
6    private final Runnable runnable;
7    // 执行之后是否释放TTL值引用
8    private final boolean releaseTtlValueReferenceAfterRun;
9
10    private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
11        // 这里关键点:TtlRunnable实例化的时候就已经进行了线程本地变量的捕获,所以一定是针对父线程的,因为此时任务还没提交到线程池
12        this.capturedRef = new AtomicReference<Object>(capture());
13        this.runnable = runnable;
14        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
15    }
16
17    @Override
18    public void run() {
19        // 获取父线程捕获到的线程本地变量映射的备份,做一些前置判断
20        Object captured = capturedRef.get();
21        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
22            throw new IllegalStateException("TTL value reference is released after run!");
23        }
24        // 重放操作
25        Object backup = replay(captured);
26        try {
27            // 真正的Runnable调用
28            runnable.run();
29        } finally {
30            // 复原操作
31            restore(backup);
32        }
33    }
34
35    @Nullable
36    public static TtlRunnable get(@Nullable Runnable runnable) {
37        return get(runnable, falsefalse);
38    }
39
40    @Nullable
41    public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
42        if (null == runnable) return null;
43        if (runnable instanceof TtlEnhanced) {
44            // avoid redundant decoration, and ensure idempotency
45            if (idempotent) return (TtlRunnable) runnable;
46            else throw new IllegalStateException("Already TtlRunnable!");
47        }
48        return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
49    }
50
51    // 省略其他不太重要的方法
52}

其实关注点只需要放在构造函数、run()方法,其他都是基于此做修饰或者扩展。构造函数的源码说明,capture()TtlRunnable实例化的时候已经被调用,实例化它的一般就是父线程,所以整体的执行流程如下:

通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

Agent模块

启用Agent功能,需要在Java的启动参数添加:-javaagent:path/to/transmittable-thread-local-x.yzx.jar。原理是通过Instrumentation回调激发ClassFileTransformer实现目标类的字节码增强,使用到javassist,被增强的类主要是泛线程池的类:

  • Executor体系:主要包括ThreadPoolExecutorScheduledThreadPoolExecutor,对应的字节码增强类实现是TtlExecutorTransformlet

  • ForkJoinPool:对应的字节码增强类实现是TtlForkJoinTransformlet

  • TimerTask:对应的字节码增强类实现是TtlTimerTaskTransformlet

Agent的入口类是TtlAgent,这里查看对应的源码:

 1public final class TtlAgent {
2
3    public static void premain(String agentArgs, @NonNull Instrumentation inst) {
4        kvs = splitCommaColonStringToKV(agentArgs);
5
6        Logger.setLoggerImplType(getLogImplTypeFromAgentArgs(kvs));
7        final Logger logger = Logger.getLogger(TtlAgent.class);
8
9        try {
10            logger.info("[TtlAgent.premain] begin, agentArgs: " + agentArgs + ", Instrumentation: " + inst);
11            final boolean disableInheritableForThreadPool = isDisableInheritableForThreadPool();
12            // 装载所有的JavassistTransformlet
13            final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
14            transformletList.add(new TtlExecutorTransformlet(disableInheritableForThreadPool));
15            transformletList.add(new TtlForkJoinTransformlet(disableInheritableForThreadPool));
16            if (isEnableTimerTask()) transformletList.add(new TtlTimerTaskTransformlet());
17            final ClassFileTransformer transformer = new TtlTransformer(transformletList);
18            inst.addTransformer(transformer, true);
19            logger.info("[TtlAgent.premain] addTransformer " + transformer.getClass() + " success");
20            logger.info("[TtlAgent.premain] end");
21            ttlAgentLoaded = true;
22        } catch (Exception e) {
23            String msg = "Fail to load TtlAgent , cause: " + e.toString();
24            logger.log(Level.SEVERE, msg, e);
25            throw new IllegalStateException(msg, e);
26        }
27    }
28}

List<JavassistTransformlet>作为参数传入ClassFileTransformer的实现类TtlTransformer中,其中的转换方法为:

 1public class TtlTransformer implements ClassFileTransformer {
2
3    private final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
4
5    TtlTransformer(List<? extends JavassistTransformlet> transformletList) {
6        for (JavassistTransformlet transformlet : transformletList) {
7            this.transformletList.add(transformlet);
8            logger.info("[TtlTransformer] add Transformlet " + transformlet.getClass() + " success");
9        }
10    }
11
12    @Override
13    public final byte[] transform(@Nullable final ClassLoader loader, @Nullable final String classFile, final Class<?> classBeingRedefined,
14                                  final ProtectionDomain protectionDomain, @NonNull final byte[] classFileBuffer) {
15        try {
16            // Lambda has no class file, no need to transform, just return.
17            if (classFile == nullreturn NO_TRANSFORM;
18            final String className = toClassName(classFile);
19            ClassInfo classInfo = new ClassInfo(className, classFileBuffer, loader);
20            // 这里做变量,如果字节码被修改,则跳出循环返回
21            for (JavassistTransformlet transformlet : transformletList) {
22                transformlet.doTransform(classInfo);
23                if (classInfo.isModified()) return classInfo.getCtClass().toBytecode();
24            }
25        } catch (Throwable t) {
26            String msg = "Fail to transform class " + classFile + ", cause: " + t.toString();
27            logger.log(Level.SEVERE, msg, t);
28            throw new IllegalStateException(msg, t);
29        }
30        return NO_TRANSFORM;
31    }
32}

这里挑选TtlExecutorTransformlet的部分方法来看:

 1    @Override
2    public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
3        // 如果当前加载的类包含java.util.concurrent.ThreadPoolExecutor或者java.util.concurrent.ScheduledThreadPoolExecutor
4        if (EXECUTOR_CLASS_NAMES.contains(classInfo.getClassName())) {
5            final CtClass clazz = classInfo.getCtClass();
6            // 遍历所有的方法进行增强
7            for (CtMethod method : clazz.getDeclaredMethods()) {
8                updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(method);
9            }
10            // 省略其他代码
11        } 
12        // 省略其他代码
13    }
14
15    private void updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(@NonNull final CtMethod method) throws NotFoundException, CannotCompileException {
16        final int modifiers = method.getModifiers();
17        if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) return;
18        // 这里主要在java.lang.Runnable构造时候调用com.alibaba.ttl.TtlRunnable#get()包装为com.alibaba.ttl.TtlRunnable
19        // 在java.util.concurrent.Callable构造时候调用com.alibaba.ttl.TtlCallable#get()包装为com.alibaba.ttl.TtlCallable
20        // 并且设置附件K-V为ttl.is.auto.wrapper=true
21        CtClass[] parameterTypes = method.getParameterTypes();
22        StringBuilder insertCode = new StringBuilder();
23        for (int i = 0; i < parameterTypes.length; i++) {
24            final String paramTypeName = parameterTypes[i].getName();
25            if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) {
26                String code = String.format(
27                        // decorate to TTL wrapper,
28                        // and then set AutoWrapper attachment/Tag
29                        "$%d = %s.get($%d, false, true);"
30                                + "ncom.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.setAutoWrapperAttachment($%<d);",
31                        i + 1, PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.get(paramTypeName), i + 1);
32                logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ": " + code);
33                insertCode.append(code);
34            }
35        }
36        if (insertCode.length() > 0) method.insertBefore(insertCode.toString());
37    }

上面分析的方法的功能,就是让java.util.concurrent.ThreadPoolExecutorjava.util.concurrent.ScheduledThreadPoolExecutor的字节码被增强,提交的java.lang.Runnable类型的任务会被包装为TtlRunnable,提交的java.util.concurrent.Callable类型的任务会被包装为TtlCallable,实现了无入侵无感知地嵌入TTL的功能。

小结

TTL在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。它是一个Java标准库,为框架/中间件设施开发提供的标配能力,项目代码精悍,只依赖了javassist做字节码增强,实现Agent模式下的近乎无入侵提供TTL功能的特性。TTL能在业务代码中实现透明/自动完成所有异步执行上下文的可定制、规范化的捕捉/传递,如果恰好碰到异步执行时上下文传递的问题,建议可以尝试此库。

参考资料:

  • JDK11相关源码

  • TTL源码

(本文完 c-14-d e-a-20200502)


原文始发于微信公众号(Throwable):通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/38896.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!