可继承 Inheritable
InheritableThreadLocal
是官方提供的类,区别于
ThreadLocal
的功能就是使子线程创建时会赋值父线程当时的
ThreadLocal
值(引用),实现开启线程后将线程本地变量传递
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ThreadLocalTest { private static final ThreadLocal<String> THREAD_LOCAL = new InheritableThreadLocal <>(); public static void main (String[] args) { THREAD_LOCAL.set("Hello World" ); new Thread (() -> { System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get()); }).start(); System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get()); } }
1 2 Thread[main,5 ,main] Hello World Thread[Thread-0 ,5 ,main] Hello World
实现
ThreadLocal
对象本质是 Thread
内
threadLocalMap
的 key
InheritableThreadLocal
功能的实现需要两方支持:
InheritableThreadLocal
将父类 ThreadLocal
关于 map 的实现重写为支持新的 map(inheritableThreadLocals)
Thread
内提供 InheritableThreadLocal
使用的 map,并且在线程创建期间将父线程的 map 值传递给子线程
InheritableThreadLocal 重写方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class InheritableThreadLocal <T> extends ThreadLocal <T> { protected T childValue (T parentValue) { return parentValue; } ThreadLocalMap getMap (Thread t) { return t.inheritableThreadLocals; } void createMap (Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap (this , firstValue); } }
Thread init 赋值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Thread implements Runnable { ... private void init (ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { ... if (inheritThreadLocals && parent.inheritableThreadLocals != null ) this .inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); this .stackSize = stackSize; tid = nextThreadID(); } ... }
局限
InheritableThreadLocal
在部分场景有其局限性:
只能在线程创建初始化时期赋值,线程池中的线程一般是复用的,所以如果使用线程池则无法传递
当然这种行为符合预期的,因为 Inheritable
强调的就是继承性 ,线程池的场景强调的是传递性
但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal
值传递已经没有意义,应用需要的实际上是把
任务提交给线程池时 的ThreadLocal
值传递到
任务执行时
alibaba/transmittable-thread-local:
transmittable-thread-local-功能
传递的是引用而不是拷贝,使用中需要注意引用数据修改的影响;也没有提供拷贝相关的能力实现
线程池复用则无法传递
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class ThreadLocalTest { private static final ThreadLocal<String> THREAD_LOCAL = new InheritableThreadLocal <>(); private static final Executor EXECUTOR = Executors.newSingleThreadExecutor(); public static void main (String[] args) { EXECUTOR.execute(() -> System.out.println("init" )); THREAD_LOCAL.set("Hello World" ); EXECUTOR.execute(() -> { System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get()); }); System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get()); } }
1 2 3 init Thread[main,5 ,main] Hello World Thread[pool-1 -thread-1 ,5 ,main] null
可传递 Transmittable
alibaba 开源的 transmittable-thread-local
可以解决满足需要
ThreadLocal
的需求场景即TransmittableThreadLocal
的潜在需求场景,如果你的业务需要
在使用线程池等会池化复用线程的执行组件情况下传递
ThreadLocal
值 则是
TransmittableThreadLocal
目标场景
下面是几个典型场景例子:
分布式跟踪系统 或 全链路压测(即链路打标)
日志收集记录系统上下文
Session
级 Cache
应用容器或上层框架跨应用代码给下层SDK
传递信息
官方文档写的非常详细,我就不赘述了,具体可以去查看官方文档~
简单使用
关于使用官方文档也描述的非常详细,这里就简单列举一下 Java 代码
依赖为 2.12.6 版本
从TTL v2.13+
开始,升级到Java 8
。
如果需要Java 6
的支持,使用版本2.12.x
1 2 3 4 5 <dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.12 .2 </version> </dependency>
装饰 Runnable
使用 TtlRunnable
和 TtlCallable
来修饰传入线程池的 Runnable
和 Callable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class ThreadLocalTest { private static final ThreadLocal<String> THREAD_LOCAL = new TransmittableThreadLocal <>(); private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(); public static void main (String[] args) { EXECUTOR.execute(() -> System.out.println("init" )); THREAD_LOCAL.set("Hello World" ); TtlRunnable ttlRunnable = TtlRunnable.get(() -> System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get())); EXECUTOR.execute(ttlRunnable); System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get()); } }
1 2 3 init Thread[main,5 ,main] Hello World Thread[pool-1 -thread-1 ,5 ,main] Hello World
需要注意: 即使提交同一个 Runnable
任务到线程池,每次都需要进行装饰操作,否则抓取的仍是上一次的上下文信息
装饰线程池
省去每次 Runnable
和 Callable
传入线程池时的修饰,这个逻辑可以在线程池中完成
通过工具类 TtlExecutors
完成,有下面的方法:
getTtlExecutor
:修饰接口 Executor
getTtlExecutorService
:修饰接口
ExecutorService
getTtlScheduledExecutorService
:修饰接口
ScheduledExecutorService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ThreadLocalTest { private static final ThreadLocal<String> THREAD_LOCAL = new TransmittableThreadLocal <>(); private static final ExecutorService EXECUTOR = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor()); public static void main (String[] args) { EXECUTOR.execute(() -> System.out.println("init" )); THREAD_LOCAL.set("Hello World" ); EXECUTOR.execute(() -> System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get())); System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get()); } }
1 2 3 init Thread[main,5 ,main] Hello World Thread[pool-1 -thread-1 ,5 ,main] Hello World
源码及思想
TransmittableThreadLocal
1 public class TransmittableThreadLocal <T> extends InheritableThreadLocal <T> implements TtlCopier <T>
继承了 InheritableThreadLocal
,实现了
TtlCopier
重点关注 set
方法
1 2 3 4 5 6 7 8 9 10 @Override public final void set (T value) { if (!disableIgnoreNullValueSemantics && null == value) { remove(); } else { super .set(value); addThisToHolder(); } }
除了使用父类 InheritableThreadLocal
的 set
方法之后,调用了 addThisToHolder
方法
1 2 3 4 5 private void addThisToHolder () { if (!holder.get().containsKey(this )) { holder.get().put((TransmittableThreadLocal<Object>) this , null ); } }
将 this
对象,即 TransmittableThreadLocal
对象作为 key 放进 holder
中,这里的 holder
是一个泛形为 Map 的
InheritableThreadLocal
,下面会进行分析
这里的 Map 只用来作为 Set,WeakHashMap supports null
value. ;类似的思想还有使用 ConcurrentHashMap
实现并发安全的 Set
holder
holder
是 TransmittableThreadLocal
进行扩展的关键一环
1 2 3 4 5 6 7 8 9 10 11 12 private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal <WeakHashMap<TransmittableThreadLocal<Object>, ?>>() { @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() { return new WeakHashMap <TransmittableThreadLocal<Object>, Object>(); } @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) { return new WeakHashMap <TransmittableThreadLocal<Object>, Object>(parentValue); } };
是一个匿名内部类,重写了 InheritableThreadLocal
的
initialValue
和 childValue
方法
initialValue:直接创建 Map
childValue:ThreadLocal.createInheritedMap
中复制值时调用,重写实现为包装一个新 Map
在上一步的 addThisToHolder
操作中,会将
TransmittableThreadLocal
的 this
对象放进
holder
的 Map 中
TtlRunnable
使用 TtlRunnable
的静态方法 get
对普通的
Runnable
进行装饰
1 2 3 4 5 6 7 8 9 10 11 12 13 public static TtlRunnable get (@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) { if (null == runnable) return null ; if (runnable instanceof TtlEnhanced) { if (idempotent) return (TtlRunnable) runnable; else throw new IllegalStateException ("Already TtlRunnable!" ); } return new TtlRunnable (runnable, releaseTtlValueReferenceAfterRun); }
最终通过 TtlRunnable
的构造器包装成
TtlRunnable
对象
1 2 3 4 5 private TtlRunnable (@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { this .capturedRef = new AtomicReference <Object>(capture()); this .runnable = runnable; this .releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; }
整个 ThreadLocal
传递的关键都在 capturedRef
对象,赋值的关键逻辑在 Transmitter.capture
方法
Transmitter
Transmitter
扮演传递者的角色,是
TransmittableThreadLocal
的公有静态内部类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static class Transmitter { @NonNull public static Object capture () { return new Snapshot (captureTtlValues(), captureThreadLocalValues()); } private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues () { HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap <TransmittableThreadLocal<Object>, Object>(); for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } return ttl2Value; } ...
创建一个 Snapshot
的对象,将 holder
中的
keys(也就是将 WeakHashMap
当作 Set 来使用,key 为
TransmittableThreadLocal
对象)遍历取出,同时调用
copyValue
1 2 3 4 5 6 7 private T copyValue () { return copy(get()); } public T copy (T parentValue) { return parentValue; }
基础的 TransmittableThreadLocal
实现就是将 value get
出来,不涉及 copy 等操作
完成这些后,TtlRunnable
就实例化完成,并且保存了
Snapshot
结构,保存当前时间点、当前线程下的
TransmittableThreadLocal
对象及其 value
run
最终放入线程池的 Runnable
是被装饰的
TtlRunnable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public void run () { final Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null )) { throw new IllegalStateException ("TTL value reference is released after run!" ); } final Object backup = replay(captured); try { runnable.run(); } finally { restore(backup); } }
replay
保存当前时间的副本,依靠传参进入的快照 captured
将设置当前线程的上下文环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public static Object replay (@NonNull Object captured) { final Snapshot capturedSnapshot = (Snapshot) captured; return new Snapshot (replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); } @NonNull private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues (@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) { HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap <TransmittableThreadLocal<Object>, Object>(); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); backup.put(threadLocal, threadLocal.get()); if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } setTtlValuesTo(captured); doExecuteCallback(true ); return backup; }
这里就是为了拿到副本,为什么需要拿到副本而不是直接根据快照设置值并恢复呢
当我们提交的任务被划分的线程有自己的上下文(任务的提交和实际执行中间存在时间差,如果这个时间段出现了上下文的更新,那么直接覆盖将导致本次更新丢失),那么就需要保证在任务执行的时候是当时的上下文,执行完毕后需要还原
获取当前线程,上线文快照
如果当前线程有快照里面不存在的上下文,那么先清除掉
将创建TtlRunnable时保存的快照设置到当前线程的上下文中(实现
ThreadLocal
传递的核心)
doExecuteCallback
执行钩子方法
返回保存的副本
restore
根据 replay 时生成的副本对上下文环境进行恢复
1 2 3 4 5 public static void restore (@NonNull Object backup) { final Snapshot backupSnapshot = (Snapshot) backup; restoreTtlValues(backupSnapshot.ttl2Value); restoreThreadLocalValues(backupSnapshot.threadLocal2Value); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private static void restoreTtlValues (@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) { doExecuteCallback(false ); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); if (!backup.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } setTtlValuesTo(backup); }
根据副本恢复当前线程的 ThreadLocal
线程池
对线程池的装饰实现和 Runnable
一致,将真正的线程池包装在
Wrapper 对象中
根据线程池实现的不同,有三个方法:
getTtlExecutor:返回 Executor
getTtlExecutorService:返回 ExecutorService
getTtlScheduledExecutorService:返回
ScheduledExecutorService
以 getTtlExecutor
为例
1 2 3 4 5 6 7 8 public static Executor getTtlExecutor (@Nullable Executor executor) { if (TtlAgent.isTtlAgentLoaded() || executor == null || executor instanceof TtlEnhanced) { return executor; } return new ExecutorTtlWrapper (executor, true ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class ExecutorTtlWrapper implements Executor , TtlWrapper<Executor>, TtlEnhanced { private final Executor executor; protected final boolean idempotent; ExecutorTtlWrapper(@NonNull Executor executor, boolean idempotent) { this .executor = executor; this .idempotent = idempotent; } @Override public void execute (@NonNull Runnable command) { executor.execute(TtlRunnable.get(command, false , idempotent)); } ... }
所以本质上就是包装为 ExecutorTtlWrapper
后重写了提交任务等方法,实现内主动去调用了装饰方法
图示
个人认为 Transmittable
核心的思想:
不同 ThreadLocal
之间的关系,保存、赋值、恢复等操作的流程
并发场景下的实现,使用原子类、副本的保存和恢复等
扩展 TTL
上面提到 InheritableThreadLocal
没有提供拷贝相关的能力,TTL 也进行了支持
SuppliedTransmittableThreadLocal
继承了
TransmittableThreadLocal
,提供了传参 Supplier
和 TtlCopier
来进行初始值、继承值拷贝、传递值拷贝的设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private static final class SuppliedTransmittableThreadLocal <T> extends TransmittableThreadLocal <T> { private final Supplier<? extends T > supplier; private final TtlCopier<T> copierForChildValue; private final TtlCopier<T> copierForCopy; SuppliedTransmittableThreadLocal(Supplier<? extends T > supplier, TtlCopier<T> copierForChildValue, TtlCopier<T> copierForCopy) { if (supplier == null ) throw new NullPointerException ("supplier is null" ); this .supplier = supplier; this .copierForChildValue = copierForChildValue; this .copierForCopy = copierForCopy; } @Override protected T initialValue () { return supplier.get(); } @Override protected T childValue (T parentValue) { if (copierForChildValue != null ) return copierForChildValue.copy(parentValue); else return super .childValue(parentValue); } @Override public T copy (T parentValue) { if (copierForCopy != null ) return copierForCopy.copy(parentValue); else return super .copy(parentValue); } }
通过静态方法
withInitial
、withInitialAndCopier
进行创建
简单使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class ThreadLocalTest { private static final ThreadLocal<Student> THREAD_LOCAL = TransmittableThreadLocal.withInitialAndCopier( () -> new Student ("默认" , 0 ), value -> new Student (value.name + " Inherit" , value.age), value -> new Student (value.name + " Transmit" , value.age) ); private static final ExecutorService EXECUTOR = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor()); public static void main (String[] args) { System.out.println("主线程设置前:" + THREAD_LOCAL.get()); THREAD_LOCAL.set(new Student ("张三" , 20 )); System.out.println("主线程设置后:" + THREAD_LOCAL.get()); new Thread (() -> { Student student = THREAD_LOCAL.get(); System.out.println("新线程:" + student); }).start(); THREAD_LOCAL.set(new Student ("张三2" , 21 )); System.out.println("主线程修改:" + THREAD_LOCAL.get()); EXECUTOR.execute(() -> { Student student = THREAD_LOCAL.get(); System.out.println("线程池:" + student); }); } @Data @AllArgsConstructor private static class Student { private String name; private Integer age; } }
1 2 3 4 5 主线程设置前:ThreadLocalTest.Student(name=默认, age=0 ) 主线程设置后:ThreadLocalTest.Student(name=张三, age=20 ) 主线程修改:ThreadLocalTest.Student(name=张三2 , age=21 ) 新线程:ThreadLocalTest.Student(name=张三 Inherit, age=20 ) 线程池:ThreadLocalTest.Student(name=张三2 Transmit, age=21 )
参考
alibaba/transmittable-thread-local
at 2.x (github.com)
TransmittableThreadLocal源码分析
- 简书 (jianshu.com)