GoogleCode ConcurrentLinkedHashMap

参数

Builder

ConcurrentLinkedHashMap 构造方法私有,只能通过其静态内部类 Builder 来进行实例化

一个完全支持并发检索的哈希表,可调节更新预期并发度,以及限制其最大容量

该实现与 ConcurrentHashMap 的不同之处在于维护了一个页面替换算法(page replacement algorithm),用于在 map 超出容量时删除元素

这个 map 实现没有共有的构造器,它的实例是通过 Builder 创建的

Builder 支持链式赋值,使用如下

1
2
3
4
5
6
7
8
public static void main(String[] args) {
ConcurrentLinkedHashMap<Integer, String> map =
new ConcurrentLinkedHashMap.Builder<Integer, String>()
.maximumWeightedCapacity(2)
.weigher(Weighers.singleton())
.listener((key, value) -> System.out.println("元素被丢弃了 key=" + key + " value=" + value))
.concurrencyLevel(32)
}

调用 build() 方法进行 ConcurrentLinkedHashMap 的实例化

1
2
3
4
public ConcurrentLinkedHashMap<K, V> build() {
checkState(capacity >= 0);
return new ConcurrentLinkedHashMap<K, V>(this);
}

这里可以看到如果 capacity,即 maximumWeightedCapacity 未进行赋值,则会抛出异常,其他参数均有默认值

1
2
3
4
5
6
7
8
public Builder() {
// 默认为 -1,结合上面的逻辑,不进行赋值实例化时会抛出异常
capacity = -1;
weigher = Weighers.entrySingleton();
initialCapacity = DEFAULT_INITIAL_CAPACITY;
concurrencyLevel = DEFAULT_CONCURRENCY_LEVEL;
listener = (EvictionListener<K, V>) DiscardingListener.INSTANCE;
}

maximumWeightedCapacity

表示该 map 最大能够允许的最大重量,并且有可能暂时超过它(我理解可能驱逐元素是一个惰性过程)

在实例化 ConcurrentLinkedHashMap 时,使用原子类作为容量的记录器

1
2
3
4
5
private ConcurrentLinkedHashMap(Builder<K, V> builder) {
// ...
capacity = new AtomicLong(Math.min(builder.capacity, MAXIMUM_CAPACITY));
// ...
}

涉及 capacity 的方法:

  • capacity:获取当前 capacity
  • setCapacity:设置 capacity
  • hasOverflowed:判断当前重量大小是否超过了 capacity

initialCapacity

表示初始化底层存放元素的 map 容量,默认为 16

1
2
3
4
5
private ConcurrentLinkedHashMap(Builder<K, V> builder) {
// ...
data = new ConcurrentHashMapV8<K, Node<K, V>>(builder.initialCapacity, 0.75f, concurrencyLevel);
// ...
}

ConcurrentLinkedHashMap 底层实现存储的结构是 ConcurrentHashMapV8,看到其参数和 JUC 下面的 ConcurrentHashMap 一样大致就可以猜到该参数的作用,本质上也是在预估数据量来避免频繁 resize 操作

对于该参数是如何用于实例化 ConcurrentHashMapV8 ,在后面的 concurrencyLevel 参数进行展示

该参数只用于实例化 ConcurrentHashMapV8

concurrencyLevel

预估的并发线程数

initialCapacity 参数一样,都是为了实例化 ConcurrentHashMapV8,其参数的作用也和 ConcurrentHashMap 一样(作者都是 Doug Lea)

ConcurrentHashMap 提高并发性能的思想就是 cell 化,按照底层 Entry 数组头元素作为并发粒度,那么 concurrencyLevelinitialCapacity 这两个参数必然同时影响底层数组的大小创建

1
2
3
4
5
6
7
8
9
10
public ConcurrentHashMapV8(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + initialCapacity / loadFactor);
int cap = (size >= MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

weigher

重量器,用来衡量一个元素占用了多少重量,默认为 SingletonWeigher

因为 ConcurrentLinkedHashMap 的核心功能是在限制 map 内元素的占用,那么衡量一个元素的权重就被抽象为了 Weigher

其中除了 Weigher 还提供了 EntryWeigher 的抽象,用于对整个 KV 进行重量的衡量

对于 Weigher 实现,会被转换为 EntryWeigher 实现(统一入口)

1
2
3
4
5
6
public Builder<K, V> weigher(Weigher<? super V> weigher) {
this.weigher = (weigher == Weighers.singleton())
? Weighers.<K, V>entrySingleton()
: new BoundedEntryWeigher<K, V>(Weighers.asEntryWeigher(weigher));
return this;
}

其中 EntryWeigherView 就是用来将 Weigher 包装为 EntryWeigher 的实现

列举一些自带的实现

  • Weigher
    • SingletonWeigher:一个 value 的重量视为 1
    • CollectionWeigher:value 为集合类型,重量为集合的 size
    • ListWeigher:value 为 list 类型,重量为 list 的 size
    • MapWeigher:value 为 map 类型,重量为 map 的 size
  • EntryWeigher
    • BoundedEntryWeigher:内部为 EntryWeigher 实现,要求重量大于等于 1
    • SingletonEntryWeigher:一个 KV 的重量视为 1
    • EntryWeigherView:用于包装 Weigher 的实现
1
2
3
4
5
6
private ConcurrentLinkedHashMap(Builder<K, V> builder) {
// ...
// The eviction support
weigher = builder.weigher;
// ...
}

该参数会在 put 元素以及一系列获取 ordered 视图操作中被使用

listener

EvictionListener 的实现,用于驱逐元素时调用该实现的 onEviction(K key, V value) 方法进行通知

默认值为 DiscardingListener.INSTANCE

1
2
3
4
5
6
7
8
9
private ConcurrentLinkedHashMap(Builder<K, V> builder) {
// ...
// The notification queue and listener
listener = builder.listener;
pendingNotifications = (listener == DiscardingListener.INSTANCE)
? (Queue<Node<K, V>>) DISCARDING_QUEUE
: new ConcurrentLinkedQueue<Node<K, V>>();
// ...
}

这里可以看出,如果 listener 是一个默认值,则 pendingNotifications 也会使用默认队列,其实现就是空实现

1
2
3
4
5
6
7
8
9
/** A queue that discards all additions and is always empty. */
static final class DiscardingQueue extends AbstractQueue<Object> {
@Override public boolean add(Object e) { return true; }
@Override public boolean offer(Object e) { return true; }
@Override public Object poll() { return null; }
@Override public Object peek() { return null; }
@Override public int size() { return 0; }
@Override public Iterator<Object> iterator() { return emptyList().iterator(); }
}

驱逐元素时不会进行通知,使用了空对象模式

在空对象模式(Null Object Pattern)中,一个空对象取代 NULL 对象实例的检查

Null 对象不是检查空值,而是反应一个不做任何动作的关系,这样的 Null 对象也可以在数据不可用的时候提供默认的行为

添加元素

putputIfAbsent 最终都调用 put(K key, V value, boolean onlyIfAbsent) 方法

1
2
3
4
5
6
7
8
9
@Override
public V put(K key, V value) {
return put(key, value, false);
}

@Override
public V putIfAbsent(K key, V value) {
return put(key, value, true);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
V put(K key, V value, boolean onlyIfAbsent) {
// 入参校验
checkNotNull(key);
checkNotNull(value);

// 计算权重
final int weight = weigher.weightOf(key, value);
// 包装节点
final WeightedValue<V> weightedValue = new WeightedValue<V>(value, weight);
final Node<K, V> node = new Node<K, V>(key, weightedValue);

for (;;) {
// put 调用
final Node<K, V> prior = data.putIfAbsent(node.key, node);
// 第一次写入 key,视为一次 Add
if (prior == null) {
afterWrite(new AddTask(node, weight));
return null;
// 存在 key 并且 onlyIfAbsent = true
} else if (onlyIfAbsent) {
afterRead(prior);
return prior.getValue();
}

// 走到该 case,说明 key 存在并且 onlyIfAbsent = false
for (;;) {
final WeightedValue<V> oldWeightedValue = prior.get();
// 并发错误下跳出,由 data.putIfAbsent(node.key, node) 继续取值
if (!oldWeightedValue.isAlive()) {
break;
}

// 更新权重,视情况进行 Update 或者 read
if (prior.compareAndSet(oldWeightedValue, weightedValue)) {
final int weightedDifference = weight - oldWeightedValue.weight;
if (weightedDifference == 0) {
afterRead(prior);
} else {
afterWrite(new UpdateTask(prior, weightedDifference));
}
return oldWeightedValue.value;
}
}
}
}

计算权重

经过参数校验后,首先要进行权重的计算和包装

1
2
3
4
5
// 计算权重
final int weight = weigher.weightOf(key, value);
// 包装节点
final WeightedValue<V> weightedValue = new WeightedValue<V>(value, weight);
final Node<K, V> node = new Node<K, V>(key, weightedValue);

权重的计算就是调用了 EntryWeigher 实现的 weightOf() 方法

随后将权重值和 value 包装进 WeightedValue,并包装为 Node

Node 即为链表的节点

1
static final class Node<K, V> extends AtomicReference<WeightedValue<V>> implements Linked<Node<K, V>>

整个保存 KV 的核心结构(上面提到的 ConcurrentHashMapV8)保存的其实是 key 和 Node 的数据:final ConcurrentMap<K, Node<K, V>> data;

无冲突

随后调用 dataputIfAbsent 方法

1
final Node<K, V> prior = data.putIfAbsent(node.key, node);

根据返回的 prior 节点情况,判断后续流程

1
2
3
4
5
6
7
8
9
// 第一次写入 key,视为一次 Add
if (prior == null) {
afterWrite(new AddTask(node, weight));
return null;
// 存在 key 并且 onlyIfAbsent = true,视为一次 Read
} else if (onlyIfAbsent) {
afterRead(prior);
return prior.getValue();
}

这两种情况均可以视为无冲突情况:

  • 不存在 prior
  • 存在 prior 但是 onlyIfAbsent = true

随后执行相应的 Add 或者 Read 任务(后面解释任务的操作)

需要更新

当 key 已经存在,那么上述 put 方法其实并未完成,只是返回了 K 对应的 value 对象(节点)

可以猜测下面流程需要做的操作:

  • 替换 key 对应的 value;因为 value 和权重是绑定关系,也即替换了权重
  • 进行 Task 流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 走到该 case,说明 key 存在并且 onlyIfAbsent = false
for (;;) {
final WeightedValue<V> oldWeightedValue = prior.get();
// 并发错误下跳出,由 data.putIfAbsent(node.key, node) 继续取值
if (!oldWeightedValue.isAlive()) {
break;
}

// 更新权重,视情况进行 Update 或 Read
if (prior.compareAndSet(oldWeightedValue, weightedValue)) {
final int weightedDifference = weight - oldWeightedValue.weight;
if (weightedDifference == 0) {
afterRead(prior);
} else {
afterWrite(new UpdateTask(prior, weightedDifference));
}
return oldWeightedValue.value;
}
}
  1. 首先获取 prior 节点的 WeightedValue 结构,当该结构 isAlive() 为 false 时,说明现在的 prior 节点在并发环境下已经成为过去式了,需要跳出循环,重新进行最开始的获取流程
  2. 当旧节点没问题,就会调用 CAS 方法来替换新的 weight 和 value,如果替换成功则会根据权重差值来进行 Read 或者 Update 任务
  3. 如果 CAS 失败,则会继续对节点进行判断,重复流程

删除元素

按 K 删除

1
2
3
4
5
6
7
8
9
10
11
@Override
public V remove(Object key) {
final Node<K, V> node = data.remove(key);
if (node == null) {
return null;
}

makeRetired(node);
afterWrite(new RemovalTask(node));
return node.getValue();
}
  1. 首先依赖 dataremove(key) 方法,拿到节点后进行判断,如果等于 null,说明该 key 没有对应的值,就不需要考虑后续流程了
  2. 如果节点不等 null,需要进行 makeRetired(),本质是修改节点 WeightedValue 的权重为负数(在添加流程中,依靠权重是否为负作为 value 被删除的标志)
  3. 随后进行 Removal 任务

按 KV 删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public boolean remove(Object key, Object value) {
final Node<K, V> node = data.get(key);
if ((node == null) || (value == null)) {
return false;
}

WeightedValue<V> weightedValue = node.get();
for (;;) {
if (weightedValue.contains(value)) {
if (tryToRetire(node, weightedValue)) {
if (data.remove(key, node)) {
afterWrite(new RemovalTask(node));
return true;
}
} else {
weightedValue = node.get();
if (weightedValue.isAlive()) {
// retry as an intermediate update may have replaced the value with
// an equal instance that has a different reference identity
continue;
}
}
}
return false;
}
}

在删除的基础上多了对 value 的比较

当 key 存在时,这里用了多个嵌套判断来实现目的或保证并发:

  1. value 是否能对应上,对应不上直接返回删除失败
  2. 让 value 退休,调用 tryToRetire(node, weightedValue) 方法,如果失败就重新获取节点的 value,进行存活判断,如果依然存活则重复整个流程(从 1 开始)
  3. 调用 data.remove(key, node) 方法,如果成功则进行 Removal 任务,失败则重新整个流程

tryToRetire

在上述删除流程中,都需要调用 tryToRetire 方法

本质上是将节点置为失效状态,失效状态根据节点 value 中的 WeightedValueweight 来标示

1
2
3
4
5
6
7
boolean tryToRetire(Node<K, V> node, WeightedValue<V> expect) {
if (expect.isAlive()) {
final WeightedValue<V> retired = new WeightedValue<V>(expect.value, -expect.weight);
return node.compareAndSet(expect, retired);
}
return false;
}

后置读写

在上述流程中,各种操作都离不开调用 afterReadafterWrite 方法

后置读写的操作本质上就是在调整链表结构来实现 LRU,对元素进行写入、驱逐和排序

afterWrite

1
2
3
4
5
6
7
8
9
10
void afterWrite(Runnable task) {
// 写缓冲区添加任务
writeBuffer.add(task);
// 设置 drain 状态为 REQUIRED
drainStatus.lazySet(REQUIRED);
// 执行缓冲区的任务
tryToDrainBuffers();
// 通知监听器
notifyListener();
}

其中 drainStatus 是一个被 AtomicReference 包装的 DrainStatus,代表当前缓冲操作的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
enum DrainStatus {

/** A drain is not taking place. */
IDLE {
@Override boolean shouldDrainBuffers(boolean delayable) {
return !delayable;
}
},

/** A drain is required due to a pending write modification. */
REQUIRED {
@Override boolean shouldDrainBuffers(boolean delayable) {
return true;
}
},

/** A drain is in progress. */
PROCESSING {
@Override boolean shouldDrainBuffers(boolean delayable) {
return false;
}
};

/**
* Determines whether the buffers should be drained.
*
* @param delayable if a drain should be delayed until required
* @return if a drain should be attempted
*/
abstract boolean shouldDrainBuffers(boolean delayable);
}

当 drainStatus 被并发安全并 lazy 设置为 REQUIRED 后,tryToDrainBuffers() 方法来执行对缓冲区的操作

tryToDrainBuffers

1
2
3
4
5
6
7
8
9
10
11
void tryToDrainBuffers() {
if (evictionLock.tryLock()) {
try {
drainStatus.lazySet(PROCESSING);
drainBuffers();
} finally {
drainStatus.compareAndSet(PROCESSING, IDLE);
evictionLock.unlock();
}
}
}

首先获取锁资格,evictionLock 是一个 ReentrantLockevictionLock = new ReentrantLock();

再将 drainStatus 状态设置为 PROCESSING;最后调用核心的 drainBuffers 方法

当执行结束后将 drainStatus CAS 修改为 IDLE,进行解锁

afterRead

1
2
3
4
5
6
7
8
9
10
11
void afterRead(Node<K, V> node) {
// 获取 buffer 的 index,这里为了避免对热 entries 的争抢
// 个人理解类似针对缓冲区的 cell 机制
final int bufferIndex = readBufferIndex();
// 根据 bufferIndex 将 node 记录在 readBuffer 中
final long writeCount = recordRead(bufferIndex, node);
// drain 操作
drainOnReadIfNeeded(bufferIndex, writeCount);
// 通知监听器
notifyListener();
}

其中 recordRead 的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
long recordRead(int bufferIndex, Node<K, V> node) {
// The location in the buffer is chosen in a racy fashion as the increment
// is not atomic with the insertion. This means that concurrent reads can
// overlap and overwrite one another, resulting in a lossy buffer.
final AtomicLong counter = readBufferWriteCount[bufferIndex];
final long writeCount = counter.get();
counter.lazySet(writeCount + 1);

final int index = (int) (writeCount & READ_BUFFER_INDEX_MASK);
readBuffers[bufferIndex][index].lazySet(node);

return writeCount;
}

读取 readBufferWriteCount 的 counter 的 value 后 +1 写入

这里注释中说明了这个操作不是原子的,意味着可以并发读取,并且会导致 buffer 的重叠和覆盖,还不清楚为什么这样处理

根据后续代码这里记录的 count 主要是用于控制读缓冲区的 drain 操作,可能不需要那么准确,猜测这样处理的原因是因为没必要为了并发安全牺牲性能

随后将 node 记录在 readBuffers 中

drainOnReadIfNeeded

该方法就是用于 drain 读缓冲区

1
2
3
4
5
6
7
8
9
10
11
void drainOnReadIfNeeded(int bufferIndex, long writeCount) {
// 获取 pending 数量,当前线程对应缓冲区在上一步操作的写数量 - 读缓冲区处理的数量(计算差值判断是否到了需要刷新的阈值)
final long pending = (writeCount - readBufferDrainAtWriteCount[bufferIndex].get());
// READ_BUFFER_THRESHOLD = 32
final boolean delayable = (pending < READ_BUFFER_THRESHOLD);
final DrainStatus status = drainStatus.get();
// 判断是否需要 drain
if (status.shouldDrainBuffers(delayable)) {
tryToDrainBuffers();
}
}

根据读 count 计算 pending 数量,根据 pending 值来判断是否执行 drain 操作

其中对于操作的判断来自于 DrainStatus,即状态枚举实现的抽象方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
enum DrainStatus {

/** A drain is not taking place. */
IDLE {
@Override boolean shouldDrainBuffers(boolean delayable) {
return !delayable;
}
},

/** A drain is required due to a pending write modification. */
REQUIRED {
@Override boolean shouldDrainBuffers(boolean delayable) {
return true;
}
},

/** A drain is in progress. */
PROCESSING {
@Override boolean shouldDrainBuffers(boolean delayable) {
return false;
}
};

/**
* Determines whether the buffers should be drained.
*
* @param delayable if a drain should be delayed until required
* @return if a drain should be attempted
*/
abstract boolean shouldDrainBuffers(boolean delayable);
}

可以看到:

  • 对于 REQUIRED,说明需要立即刷新缓冲区(这个状态最直接是来自于写操作)
  • 对于 PROCESSING,说明当前其他线程正在处理,所以不需要该线程再进行操作了
  • 对于 IDLE,则根据 pending 数量得到的 delayable 来判断

最后也是调用 tryToDrainBuffers 方法来对缓冲区进行 drain 操作

drainWriteBuffer

1
2
3
4
5
6
7
8
9
void drainWriteBuffer() {
for (int i = 0; i < WRITE_BUFFER_DRAIN_THRESHOLD; i++) {
final Runnable task = writeBuffer.poll();
if (task == null) {
break;
}
task.run();
}
}

获取一定数量的 task,然后执行 task 的 run 方法

每一次 drain 操作都有一个阈值(WRITE_BUFFER_DRAIN_THRESHOLD = 16),应该是为每个线程分摊工作量,在 read 中也是这样类似的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
  /** Adds the node to the page replacement policy. */
final class AddTask implements Runnable {
final Node<K, V> node;
final int weight;

AddTask(Node<K, V> node, int weight) {
this.weight = weight;
this.node = node;
}

@Override
@GuardedBy("evictionLock")
public void run() {
weightedSize.lazySet(weightedSize.get() + weight);

// ignore out-of-order write operations
if (node.get().isAlive()) {
evictionDeque.add(node);
evict();
}
}
}

/** Removes a node from the page replacement policy. */
final class RemovalTask implements Runnable {
final Node<K, V> node;

RemovalTask(Node<K, V> node) {
this.node = node;
}

@Override
@GuardedBy("evictionLock")
public void run() {
// add may not have been processed yet
evictionDeque.remove(node);
makeDead(node);
}
}

/** Updates the weighted size and evicts an entry on overflow. */
final class UpdateTask implements Runnable {
final int weightDifference;
final Node<K, V> node;

public UpdateTask(Node<K, V> node, int weightDifference) {
this.weightDifference = weightDifference;
this.node = node;
}

@Override
@GuardedBy("evictionLock")
public void run() {
weightedSize.lazySet(weightedSize.get() + weightDifference);
applyRead(node);
evict();
}
}

对于 task 的实现,有 3 种:

  • AddTask
    • 调整 weight
    • evictionDeque 添加节点(尾插)
    • 调用驱逐方法 evict
  • RemovalTask
    • evictionDeque 移除节点
    • 设置节点 dead(包括调整 weight)
  • UpdateTask
    • 调整 weight
    • 调整 node 到尾部
    • 调用驱逐方法 evict

为什么 UpdateTask 也需要驱逐元素?因为 weight 的计算方法是由实现类提供的,可能根据不同的情况计算 weight,更新操作也可能更新 weight,所以需要进行驱逐操作

驱逐方法 evict

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void evict() {
// Attempts to evict entries from the map if it exceeds the maximum
// capacity. If the eviction fails due to a concurrent removal of the
// victim, that removal may cancel out the addition that triggered this
// eviction. The victim is eagerly unlinked before the removal task so
// that if an eviction is still required then a new victim will be chosen
// for removal.
// 判断是否超过 weight 阈值
while (hasOverflowed()) {
final Node<K, V> node = evictionDeque.poll();

// If weighted values are used, then the pending operations will adjust
// the size to reflect the correct weight
if (node == null) {
return;
}

// Notify the listener only if the entry was evicted
if (data.remove(node.key, node)) {
// 待通知队列
pendingNotifications.add(node);
}
// 标记删除
makeDead(node);
}
}

该方法就是为了实现 LRU,如果超出了 weight,则会从 evictionDeque 中 poll 出元素后,记录待通知队列,标记删除

标记删除删除的是 Map 结构中的元素(修改其 weight 标记删除),其中 node 也是 Map 中持有的引用

drainReadBuffers

1
2
3
4
5
6
7
void drainReadBuffers() {
final int start = (int) Thread.currentThread().getId();
final int end = start + NUMBER_OF_READ_BUFFERS;
for (int i = start; i < end; i++) {
drainReadBuffer(i & READ_BUFFERS_MASK);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void drainReadBuffer(int bufferIndex) {
final long writeCount = readBufferWriteCount[bufferIndex].get();
for (int i = 0; i < READ_BUFFER_DRAIN_THRESHOLD; i++) {
final int index = (int) (readBufferReadCount[bufferIndex] & READ_BUFFER_INDEX_MASK);
final AtomicReference<Node<K, V>> slot = readBuffers[bufferIndex][index];
final Node<K, V> node = slot.get();
if (node == null) {
break;
}
slot.lazySet(null);
// 本质上就是将 evictionDeque 中对应的 node 移动到队尾(LRU)
applyRead(node);
readBufferReadCount[bufferIndex]++;
}
readBufferDrainAtWriteCount[bufferIndex].lazySet(writeCount);
}

上述操作可以概括成几个要点:

  • 每一次 drain 操作分配了 buffer 操作的阈值
  • 获取 node,释放 node
  • 将 node 移动到队尾
  • 记录 readBufferDrainAtWriteCount

驱逐监听器

最后就是 notifyListener 方法调用 EvictionListener 的实现来进行元素驱逐的通知功能

相关的方法在 afterReadafterWritesetCapacity 中被调用

1
2
3
4
5
6
void notifyListener() {
Node<K, V> node;
while ((node = pendingNotifications.poll()) != null) {
listener.onEviction(node.key, node.getValue());
}
}

遍历 pendingNotifications 中被驱逐的 node,调用 listener 的 onEviction 方法

参考

ben-manes/concurrentlinkedhashmap: A ConcurrentLinkedHashMap for Java (github.com)

https://code.google.com/p/concurrentlinkedhashmap/