贫瘠之地

华北无浪漫,死海扬起帆
多少个夜晚,独自望着天

0%

TransmittableThreadLocal 源码 & 简单使用

可继承 Inheritable

InheritableThreadLocal 是官方提供的类,区别于 ThreadLocal 的功能就是使子线程创建时会赋值父线程当时的 ThreadLocal 值(引用),实现开启线程后将线程本地变量传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadLocalTest {

// 这里如果将 ThreadLocal 实现换为 ThreadLocal,则子线程内获取为 null
private static final ThreadLocal<String> THREAD_LOCAL = new InheritableThreadLocal<>();

public static void main(String[] args) {
THREAD_LOCAL.set("Hello World");

new Thread(() -> {
System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get());
}).start();

System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get());
}
}
1
2
Thread[main,5,main] Hello World
Thread[Thread-0,5,main] Hello World

实现

ThreadLocal 对象本质是 ThreadthreadLocalMap 的 key

InheritableThreadLocal 功能的实现需要两方支持:

  • InheritableThreadLocal 将父类 ThreadLocal 关于 map 的实现重写为支持新的 map(inheritableThreadLocals)
  • Thread 内提供 InheritableThreadLocal 使用的 map,并且在线程创建期间将父线程的 map 值传递给子线程

InheritableThreadLocal 重写方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
/**
* Computes the child's initial value for this inheritable thread-local
* variable as a function of the parent's value at the time the child
* thread is created. This method is called from within the parent
* thread before the child is started.
* <p>
* This method merely returns its input argument, and should be overridden
* if a different behavior is desired.
*
* @param parentValue the parent thread's value
* @return the child thread's initial value
*/
protected T childValue(T parentValue) {
return parentValue;
}

/**
* Get the map associated with a ThreadLocal.
*
* @param t the current thread
*/
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}

/**
* Create the map associated with a ThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the table.
*/
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}

Thread init 赋值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Thread implements Runnable {
...

private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
...
// 如果参数开关开启,父线程存在 inheritableThreadLocals,则进行 createInheritedMap 并赋值
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;

/* Set thread ID */
tid = nextThreadID();
}

...
}

局限

InheritableThreadLocal 在部分场景有其局限性:

  • 只能在线程创建初始化时期赋值,线程池中的线程一般是复用的,所以如果使用线程池则无法传递

    当然这种行为符合预期的,因为 Inheritable 强调的就是继承性,线程池的场景强调的是传递性

    但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时ThreadLocal值传递到 任务执行时

    alibaba/transmittable-thread-local: transmittable-thread-local-功能

  • 传递的是引用而不是拷贝,使用中需要注意引用数据修改的影响;也没有提供拷贝相关的能力实现

线程池复用则无法传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadLocalTest {

// 单线程线程池
private static final ThreadLocal<String> THREAD_LOCAL = new InheritableThreadLocal<>();

private static final Executor EXECUTOR = Executors.newSingleThreadExecutor();

public static void main(String[] args) {
// 为了让线程池 init 一个新线程
EXECUTOR.execute(() -> System.out.println("init"));

THREAD_LOCAL.set("Hello World");

// 复用线程
EXECUTOR.execute(() -> {
System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get());
});

System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get());
}

}
1
2
3
init
Thread[main,5,main] Hello World
Thread[pool-1-thread-1,5,main] null

可传递 Transmittable

alibaba 开源的 transmittable-thread-local 可以解决满足需要

ThreadLocal的需求场景即TransmittableThreadLocal的潜在需求场景,如果你的业务需要 在使用线程池等会池化复用线程的执行组件情况下传递 ThreadLocal 则是 TransmittableThreadLocal 目标场景

下面是几个典型场景例子:

  1. 分布式跟踪系统 或 全链路压测(即链路打标)
  2. 日志收集记录系统上下文
  3. SessionCache
  4. 应用容器或上层框架跨应用代码给下层SDK传递信息

官方文档写的非常详细,我就不赘述了,具体可以去查看官方文档~

简单使用

关于使用官方文档也描述的非常详细,这里就简单列举一下 Java 代码

依赖为 2.12.6 版本

TTL v2.13+开始,升级到Java 8。 如果需要Java 6的支持,使用版本2.12.x

Maven Central

1
2
3
4
5
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.12.2</version>
</dependency>

装饰 Runnable

使用 TtlRunnableTtlCallable 来修饰传入线程池的 RunnableCallable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ThreadLocalTest {

// 使用 TransmittableThreadLocal
private static final ThreadLocal<String> THREAD_LOCAL = new TransmittableThreadLocal<>();

private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();

public static void main(String[] args) {
EXECUTOR.execute(() -> System.out.println("init"));

THREAD_LOCAL.set("Hello World");

// 装饰 Runnable 为 TtlRunnable
TtlRunnable ttlRunnable = TtlRunnable.get(() -> System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get()));
EXECUTOR.execute(ttlRunnable);

System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get());
}

}
1
2
3
init
Thread[main,5,main] Hello World
Thread[pool-1-thread-1,5,main] Hello World

需要注意:即使提交同一个 Runnable 任务到线程池,每次都需要进行装饰操作,否则抓取的仍是上一次的上下文信息

装饰线程池

省去每次 RunnableCallable 传入线程池时的修饰,这个逻辑可以在线程池中完成

通过工具类 TtlExecutors 完成,有下面的方法:

  • getTtlExecutor:修饰接口 Executor
  • getTtlExecutorService:修饰接口 ExecutorService
  • getTtlScheduledExecutorService:修饰接口 ScheduledExecutorService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ThreadLocalTest {

private static final ThreadLocal<String> THREAD_LOCAL = new TransmittableThreadLocal<>();

// 装饰线程池 ExecutorService
private static final ExecutorService EXECUTOR = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());

public static void main(String[] args) {
EXECUTOR.execute(() -> System.out.println("init"));

THREAD_LOCAL.set("Hello World");

// 不需要再装饰 Runnable
EXECUTOR.execute(() -> System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get()));

System.out.println(Thread.currentThread() + " " + THREAD_LOCAL.get());
}

}
1
2
3
init
Thread[main,5,main] Hello World
Thread[pool-1-thread-1,5,main] Hello World

源码及思想

TransmittableThreadLocal

1
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T>

继承了 InheritableThreadLocal,实现了 TtlCopier

重点关注 set 方法

1
2
3
4
5
6
7
8
9
10
@Override
public final void set(T value) {
if (!disableIgnoreNullValueSemantics && null == value) {
// may set null to remove value
remove();
} else {
super.set(value);
addThisToHolder();
}
}

除了使用父类 InheritableThreadLocalset 方法之后,调用了 addThisToHolder 方法

1
2
3
4
5
private void addThisToHolder() {
if (!holder.get().containsKey(this)) {
holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
}
}

this 对象,即 TransmittableThreadLocal 对象作为 key 放进 holder 中,这里的 holder 是一个泛形为 Map 的 InheritableThreadLocal,下面会进行分析

这里的 Map 只用来作为 Set,WeakHashMap supports null value.;类似的思想还有使用 ConcurrentHashMap 实现并发安全的 Set

holder

holderTransmittableThreadLocal 进行扩展的关键一环

1
2
3
4
5
6
7
8
9
10
11
12
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}

@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};

是一个匿名内部类,重写了 InheritableThreadLocalinitialValuechildValue 方法

  • initialValue:直接创建 Map
  • childValue:ThreadLocal.createInheritedMap 中复制值时调用,重写实现为包装一个新 Map

在上一步的 addThisToHolder 操作中,会将 TransmittableThreadLocalthis 对象放进 holder 的 Map 中

TtlRunnable

使用 TtlRunnable 的静态方法 get 对普通的 Runnable 进行装饰

1
2
3
4
5
6
7
8
9
10
11
12
13
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;

// 这里避免过度装饰,只装饰一层
if (runnable instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}

// 包装成 TtlRunnable 对象
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}

最终通过 TtlRunnable 的构造器包装成 TtlRunnable 对象

1
2
3
4
5
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}

整个 ThreadLocal 传递的关键都在 capturedRef 对象,赋值的关键逻辑在 Transmitter.capture 方法

Transmitter

Transmitter 扮演传递者的角色,是 TransmittableThreadLocal 的公有静态内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    public static class Transmitter {
/**
* Capture all {@link TransmittableThreadLocal} and registered {@link ThreadLocal} values in the current thread.
*
* @return the captured {@link TransmittableThreadLocal} values
* @since 2.3.0
*/
@NonNull
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}

private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap<TransmittableThreadLocal<Object>, Object>();
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}

...

创建一个 Snapshot 的对象,将 holder 中的 keys(也就是将 WeakHashMap 当作 Set 来使用,key 为 TransmittableThreadLocal 对象)遍历取出,同时调用 copyValue

1
2
3
4
5
6
7
private T copyValue() {
return copy(get());
}

public T copy(T parentValue) {
return parentValue;
}

基础的 TransmittableThreadLocal 实现就是将 value get 出来,不涉及 copy 等操作

完成这些后,TtlRunnable 就实例化完成,并且保存了 Snapshot 结构,保存当前时间点、当前线程下的 TransmittableThreadLocal 对象及其 value

run

最终放入线程池的 Runnable 是被装饰的 TtlRunnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void run() {
// 获取当时保存的 父线程 + 装饰那一刻的 Snapshot
final Object captured = capturedRef.get();
// captured == null 说明该 Runnable 已经被 run 了
// capturedRef.compareAndSet(captured, null) 使用原子类保证争抢执行资格
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}

// replay 重放
final Object backup = replay(captured);
try {
// 执行被装饰的真正的 Runnable
runnable.run();
} finally {
// restore 恢复
restore(backup);
}
}

replay

保存当前时间的副本,依靠传参进入的快照 captured 将设置当前线程的上下文环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static Object replay(@NonNull Object captured) {
// 取出快照
final Snapshot capturedSnapshot = (Snapshot) captured;
return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}

@NonNull
private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap<TransmittableThreadLocal<Object>, Object>();

// 注意这里的 holder,此时已经在子线程内了
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();

// backup
backup.put(threadLocal, threadLocal.get());

// clear the TTL values that is not in captured
// avoid the extra TTL values after replay when run task
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}

// set TTL values to captured
setTtlValuesTo(captured);

// call beforeExecute callback
doExecuteCallback(true);

return backup;
}

这里就是为了拿到副本,为什么需要拿到副本而不是直接根据快照设置值并恢复呢

当我们提交的任务被划分的线程有自己的上下文(任务的提交和实际执行中间存在时间差,如果这个时间段出现了上下文的更新,那么直接覆盖将导致本次更新丢失),那么就需要保证在任务执行的时候是当时的上下文,执行完毕后需要还原

  1. 获取当前线程,上线文快照
  2. 如果当前线程有快照里面不存在的上下文,那么先清除掉
  3. 将创建TtlRunnable时保存的快照设置到当前线程的上下文中(实现 ThreadLocal 传递的核心)
  4. doExecuteCallback 执行钩子方法
  5. 返回保存的副本

restore

根据 replay 时生成的副本对上下文环境进行恢复

1
2
3
4
5
public static void restore(@NonNull Object backup) {
final Snapshot backupSnapshot = (Snapshot) backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
// call afterExecute callback
doExecuteCallback(false);

for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();

// clear the TTL values that is not in backup
// avoid the extra TTL values after restore
if (!backup.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}

// restore TTL values
setTtlValuesTo(backup);
}

根据副本恢复当前线程的 ThreadLocal

线程池

对线程池的装饰实现和 Runnable 一致,将真正的线程池包装在 Wrapper 对象中

根据线程池实现的不同,有三个方法:

  • getTtlExecutor:返回 Executor
  • getTtlExecutorService:返回 ExecutorService
  • getTtlScheduledExecutorService:返回 ScheduledExecutorService

getTtlExecutor 为例

1
2
3
4
5
6
7
8
public static Executor getTtlExecutor(@Nullable Executor executor) {
// 判断是否是 TtlAgent 加载、或者是 TtlEnhanced,避免重复装饰
if (TtlAgent.isTtlAgentLoaded() || executor == null || executor instanceof TtlEnhanced) {
return executor;
}
// ExecutorTtlWrapper
return new ExecutorTtlWrapper(executor, true);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ExecutorTtlWrapper implements Executor, TtlWrapper<Executor>, TtlEnhanced {
private final Executor executor;
protected final boolean idempotent;

ExecutorTtlWrapper(@NonNull Executor executor, boolean idempotent) {
this.executor = executor;
this.idempotent = idempotent;
}

// 帮忙调用了 TtlRunnable.get
@Override
public void execute(@NonNull Runnable command) {
executor.execute(TtlRunnable.get(command, false, idempotent));
}
...
}

所以本质上就是包装为 ExecutorTtlWrapper 后重写了提交任务等方法,实现内主动去调用了装饰方法

图示

个人认为 Transmittable 核心的思想:

  • 不同 ThreadLocal 之间的关系,保存、赋值、恢复等操作的流程
  • 并发场景下的实现,使用原子类、副本的保存和恢复等

扩展 TTL

上面提到 InheritableThreadLocal 没有提供拷贝相关的能力,TTL 也进行了支持

SuppliedTransmittableThreadLocal 继承了 TransmittableThreadLocal,提供了传参 SupplierTtlCopier 来进行初始值、继承值拷贝、传递值拷贝的设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static final class SuppliedTransmittableThreadLocal<T> extends TransmittableThreadLocal<T> {
// initialValue
private final Supplier<? extends T> supplier;
// childValue InheritableThreadLocal
private final TtlCopier<T> copierForChildValue;
// copy
private final TtlCopier<T> copierForCopy;

SuppliedTransmittableThreadLocal(Supplier<? extends T> supplier, TtlCopier<T> copierForChildValue, TtlCopier<T> copierForCopy) {
if (supplier == null) throw new NullPointerException("supplier is null");
this.supplier = supplier;
this.copierForChildValue = copierForChildValue;
this.copierForCopy = copierForCopy;
}

@Override
protected T initialValue() {
return supplier.get();
}

@Override
protected T childValue(T parentValue) {
if (copierForChildValue != null) return copierForChildValue.copy(parentValue);
else return super.childValue(parentValue);
}

@Override
public T copy(T parentValue) {
if (copierForCopy != null) return copierForCopy.copy(parentValue);
else return super.copy(parentValue);
}
}

通过静态方法 withInitialwithInitialAndCopier 进行创建

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadLocalTest {

private static final ThreadLocal<Student> THREAD_LOCAL =
TransmittableThreadLocal.withInitialAndCopier(
() -> new Student("默认", 0),
value -> new Student(value.name + " Inherit", value.age),
value -> new Student(value.name + " Transmit", value.age)
);

// 装饰线程池 ExecutorService
private static final ExecutorService EXECUTOR =
TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());

public static void main(String[] args) {
System.out.println("主线程设置前:" + THREAD_LOCAL.get());

THREAD_LOCAL.set(new Student("张三", 20));
System.out.println("主线程设置后:" + THREAD_LOCAL.get());

// 新线程
new Thread(() -> {
Student student = THREAD_LOCAL.get();
System.out.println("新线程:" + student);
}).start();

// 更新
THREAD_LOCAL.set(new Student("张三2", 21));
System.out.println("主线程修改:" + THREAD_LOCAL.get());

// 线程池
EXECUTOR.execute(() -> {
Student student = THREAD_LOCAL.get();
System.out.println("线程池:" + student);
});
}

@Data
@AllArgsConstructor
private static class Student {
private String name;
private Integer age;
}
}
1
2
3
4
5
主线程设置前:ThreadLocalTest.Student(name=默认, age=0)
主线程设置后:ThreadLocalTest.Student(name=张三, age=20)
主线程修改:ThreadLocalTest.Student(name=张三2, age=21)
新线程:ThreadLocalTest.Student(name=张三 Inherit, age=20)
线程池:ThreadLocalTest.Student(name=张三2 Transmit, age=21)

参考

alibaba/transmittable-thread-local at 2.x (github.com)

TransmittableThreadLocal源码分析 - 简书 (jianshu.com)